1use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::{call_pure_builtin, is_pure_call};
16use crate::policy::Policy;
17
18pub trait IoSink: Send {
21 fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26 fn print_line(&mut self, s: &str) {
27 println!("{s}");
28 }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34 fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37pub type StreamRegistry =
40 std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43 policy: Policy,
44 pub sink: Box<dyn IoSink>,
45 pub read_root: Option<PathBuf>,
48 pub budget_remaining: Arc<AtomicU64>,
55 pub budget_ceiling: Option<u64>,
59 pub program: Option<Arc<Program>>,
63 pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67 pub mcp_clients: crate::mcp_client::McpClientCache,
73 pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80 pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85 pub fn new(policy: Policy) -> Self {
86 let ceiling = policy.budget;
90 let initial = ceiling.unwrap_or(u64::MAX);
91 Self {
92 policy,
93 sink: Box::new(StdoutSink),
94 read_root: None,
95 budget_remaining: Arc::new(AtomicU64::new(initial)),
96 budget_ceiling: ceiling,
97 program: None,
98 chat_registry: None,
99 mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100 streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101 next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102 }
103 }
104
105 pub fn with_program(mut self, program: Arc<Program>) -> Self {
106 self.program = Some(program); self
107 }
108
109 pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110 self.chat_registry = Some(registry); self
111 }
112
113 pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114 self.sink = sink; self
115 }
116
117 pub fn with_read_root(mut self, root: PathBuf) -> Self {
118 self.read_root = Some(root); self
119 }
120
121 fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122 if self.policy.allow_effects.contains(kind) {
123 Ok(())
124 } else {
125 Err(format!("effect `{kind}` not in --allow-effects"))
126 }
127 }
128
129 fn resolve_read_path(&self, p: &str) -> PathBuf {
130 match &self.read_root {
131 Some(root) => root.join(p.trim_start_matches('/')),
132 None => PathBuf::from(p),
133 }
134 }
135
136 fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137 match op {
138 "debug" | "info" | "warn" | "error" => {
139 let msg = expect_str(args.first())?;
140 let level = match op {
141 "debug" => LogLevel::Debug,
142 "info" => LogLevel::Info,
143 "warn" => LogLevel::Warn,
144 _ => LogLevel::Error,
145 };
146 emit_log(level, msg);
147 Ok(Value::Unit)
148 }
149 "set_level" => {
150 let s = expect_str(args.first())?;
151 match parse_log_level(s) {
152 Some(l) => {
153 log_state().lock().unwrap().level = l;
154 Ok(ok(Value::Unit))
155 }
156 None => Ok(err(Value::Str(format!(
157 "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158 }
159 }
160 "set_format" => {
161 let s = expect_str(args.first())?;
162 let fmt = match s {
163 "text" => LogFormat::Text,
164 "json" => LogFormat::Json,
165 other => return Ok(err(Value::Str(format!(
166 "log.set_format: unknown format `{other}`; expected text|json")))),
167 };
168 log_state().lock().unwrap().format = fmt;
169 Ok(ok(Value::Unit))
170 }
171 "set_sink" => {
172 let path = expect_str(args.first())?;
173 if path == "-" {
174 log_state().lock().unwrap().sink = LogSink::Stderr;
175 return Ok(ok(Value::Unit));
176 }
177 if let Err(e) = self.ensure_fs_write_path(path) {
178 return Ok(err(Value::Str(e)));
179 }
180 match std::fs::OpenOptions::new()
181 .create(true).append(true).open(path)
182 {
183 Ok(f) => {
184 log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185 Ok(ok(Value::Unit))
186 }
187 Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188 }
189 }
190 other => Err(format!("unsupported log.{other}")),
191 }
192 }
193
194 fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195 match op {
196 "spawn" => {
197 let cmd = expect_str(args.first())?.to_string();
198 let raw_args = match args.get(1) {
199 Some(Value::List(items)) => items.clone(),
200 _ => return Err("process.spawn: args must be List[Str]".into()),
201 };
202 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203 Value::Str(s) => Ok(s.clone()),
204 other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205 }).collect();
206 let str_args = str_args?;
207 let opts = match args.get(2) {
208 Some(Value::Record(r)) => r.clone(),
209 _ => return Err("process.spawn: missing or invalid opts record".into()),
210 };
211
212 if !self.policy.allow_proc.is_empty() {
214 let basename = std::path::Path::new(&cmd)
215 .file_name()
216 .and_then(|s| s.to_str())
217 .unwrap_or(&cmd);
218 if !self.policy.allow_proc.iter().any(|a| a == basename) {
219 return Ok(err(Value::Str(format!(
220 "process.spawn: `{cmd}` not in --allow-proc {:?}",
221 self.policy.allow_proc
222 ))));
223 }
224 }
225
226 let mut command = std::process::Command::new(&cmd);
227 command.args(&str_args);
228 command.stdin(std::process::Stdio::piped());
229 command.stdout(std::process::Stdio::piped());
230 command.stderr(std::process::Stdio::piped());
231
232 if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233 if name == "Some" {
234 if let Some(Value::Str(s)) = vargs.first() {
235 command.current_dir(s);
236 }
237 }
238 }
239 if let Some(Value::Map(env)) = opts.get("env") {
240 for (k, v) in env {
241 if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242 command.env(ks, vs);
243 }
244 }
245 }
246
247 let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248 Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249 match vargs.first() {
250 Some(Value::Bytes(b)) => Some(b.clone()),
251 _ => None,
252 }
253 }
254 _ => None,
255 };
256
257 let mut child = match command.spawn() {
258 Ok(c) => c,
259 Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260 };
261
262 if let Some(payload) = stdin_payload {
263 if let Some(mut stdin) = child.stdin.take() {
264 use std::io::Write;
265 let _ = stdin.write_all(&payload);
266 }
268 }
269
270 let stdout = child.stdout.take().map(std::io::BufReader::new);
271 let stderr = child.stderr.take().map(std::io::BufReader::new);
272 let handle = next_process_handle();
273 process_registry().lock().unwrap().insert(handle, ProcessState {
274 child,
275 stdout,
276 stderr,
277 });
278 Ok(ok(Value::Int(handle as i64)))
279 }
280 "read_stdout_line" => Self::read_line_op(args, true),
281 "read_stderr_line" => Self::read_line_op(args, false),
282 "wait" => {
283 let h = expect_process_handle(args.first())?;
284 let arc = process_registry().lock().unwrap()
288 .touch_get(h)
289 .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290 let status = {
291 let mut state = arc.lock().unwrap();
292 state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293 };
294 process_registry().lock().unwrap().remove(h);
298 let mut rec = indexmap::IndexMap::new();
299 rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300 #[cfg(unix)]
301 {
302 use std::os::unix::process::ExitStatusExt;
303 rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304 }
305 #[cfg(not(unix))]
306 {
307 rec.insert("signaled".into(), Value::Bool(false));
308 }
309 Ok(Value::Record(rec))
310 }
311 "kill" => {
312 let h = expect_process_handle(args.first())?;
313 let _signal = expect_str(args.get(1))?;
314 let arc = process_registry().lock().unwrap()
315 .touch_get(h)
316 .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317 let mut state = arc.lock().unwrap();
318 match state.child.kill() {
321 Ok(_) => Ok(ok(Value::Unit)),
322 Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323 }
324 }
325 "run" => {
326 let cmd = expect_str(args.first())?.to_string();
327 let raw_args = match args.get(1) {
328 Some(Value::List(items)) => items.clone(),
329 _ => return Err("process.run: args must be List[Str]".into()),
330 };
331 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332 Value::Str(s) => Ok(s.clone()),
333 other => Err(format!("process.run: arg must be Str, got {other:?}")),
334 }).collect();
335 let str_args = str_args?;
336 if !self.policy.allow_proc.is_empty() {
337 let basename = std::path::Path::new(&cmd)
338 .file_name()
339 .and_then(|s| s.to_str())
340 .unwrap_or(&cmd);
341 if !self.policy.allow_proc.iter().any(|a| a == basename) {
342 return Ok(err(Value::Str(format!(
343 "process.run: `{cmd}` not in --allow-proc {:?}",
344 self.policy.allow_proc
345 ))));
346 }
347 }
348 match std::process::Command::new(&cmd).args(&str_args).output() {
349 Ok(o) => {
350 let mut rec = indexmap::IndexMap::new();
351 rec.insert("stdout".into(), Value::Str(
352 String::from_utf8_lossy(&o.stdout).to_string()));
353 rec.insert("stderr".into(), Value::Str(
354 String::from_utf8_lossy(&o.stderr).to_string()));
355 rec.insert("exit_code".into(), Value::Int(
356 o.status.code().unwrap_or(-1) as i64));
357 Ok(ok(Value::Record(rec)))
358 }
359 Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360 }
361 }
362 other => Err(format!("unsupported process.{other}")),
363 }
364 }
365
366 fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372 let h = expect_process_handle(args.first())?;
373 let arc = process_registry().lock().unwrap()
374 .touch_get(h)
375 .ok_or_else(|| format!(
376 "process.read_{}_line: closed or unknown ProcessHandle",
377 if is_stdout { "stdout" } else { "stderr" }))?;
378 let mut state = arc.lock().unwrap();
379 let reader_opt = if is_stdout {
380 state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381 } else {
382 state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383 };
384 let reader = match reader_opt {
385 Some(r) => r,
386 None => return Ok(none()),
387 };
388 let mut line = String::new();
389 match reader.read_line(&mut line) {
390 Ok(0) => Ok(none()),
391 Ok(_) => {
392 if line.ends_with('\n') { line.pop(); }
393 if line.ends_with('\r') { line.pop(); }
394 Ok(some(Value::Str(line)))
395 }
396 Err(e) => Err(format!("process.read_*_line: {e}")),
397 }
398 }
399
400 fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401 match op {
402 "exists" => {
403 let path = expect_str(args.first())?.to_string();
404 if let Err(e) = self.ensure_fs_walk_path(&path) {
405 return Ok(err(Value::Str(e)));
406 }
407 Ok(Value::Bool(std::path::Path::new(&path).exists()))
408 }
409 "is_file" => {
410 let path = expect_str(args.first())?.to_string();
411 if let Err(e) = self.ensure_fs_walk_path(&path) {
412 return Ok(err(Value::Str(e)));
413 }
414 Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415 }
416 "is_dir" => {
417 let path = expect_str(args.first())?.to_string();
418 if let Err(e) = self.ensure_fs_walk_path(&path) {
419 return Ok(err(Value::Str(e)));
420 }
421 Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422 }
423 "stat" => {
424 let path = expect_str(args.first())?.to_string();
425 if let Err(e) = self.ensure_fs_walk_path(&path) {
426 return Ok(err(Value::Str(e)));
427 }
428 match std::fs::metadata(&path) {
429 Ok(md) => {
430 let mtime = md.modified()
431 .ok()
432 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433 .map(|d| d.as_secs() as i64)
434 .unwrap_or(0);
435 let mut rec = indexmap::IndexMap::new();
436 rec.insert("size".into(), Value::Int(md.len() as i64));
437 rec.insert("mtime".into(), Value::Int(mtime));
438 rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439 rec.insert("is_file".into(), Value::Bool(md.is_file()));
440 Ok(ok(Value::Record(rec)))
441 }
442 Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443 }
444 }
445 "list_dir" => {
446 let path = expect_str(args.first())?.to_string();
447 if let Err(e) = self.ensure_fs_walk_path(&path) {
448 return Ok(err(Value::Str(e)));
449 }
450 match std::fs::read_dir(&path) {
451 Ok(rd) => {
452 let mut entries: Vec<Value> = Vec::new();
453 for ent in rd {
454 match ent {
455 Ok(e) => {
456 let p = e.path();
457 entries.push(Value::Str(p.to_string_lossy().into_owned()));
458 }
459 Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460 }
461 }
462 Ok(ok(Value::List(entries)))
463 }
464 Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465 }
466 }
467 "walk" => {
468 let path = expect_str(args.first())?.to_string();
469 if let Err(e) = self.ensure_fs_walk_path(&path) {
470 return Ok(err(Value::Str(e)));
471 }
472 let mut paths: Vec<Value> = Vec::new();
473 for ent in walkdir::WalkDir::new(&path) {
474 match ent {
475 Ok(e) => paths.push(Value::Str(
476 e.path().to_string_lossy().into_owned())),
477 Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478 }
479 }
480 Ok(ok(Value::List(paths)))
481 }
482 "glob" => {
483 let pattern = expect_str(args.first())?.to_string();
484 let entries = match glob::glob(&pattern) {
489 Ok(e) => e,
490 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491 };
492 let mut paths: Vec<Value> = Vec::new();
493 for ent in entries {
494 match ent {
495 Ok(p) => {
496 let s = p.to_string_lossy().into_owned();
497 if self.policy.allow_fs_read.is_empty()
498 || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499 {
500 paths.push(Value::Str(s));
501 }
502 }
503 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504 }
505 }
506 Ok(ok(Value::List(paths)))
507 }
508 "mkdir_p" => {
509 let path = expect_str(args.first())?.to_string();
510 if let Err(e) = self.ensure_fs_write_path(&path) {
511 return Ok(err(Value::Str(e)));
512 }
513 match std::fs::create_dir_all(&path) {
514 Ok(_) => Ok(ok(Value::Unit)),
515 Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516 }
517 }
518 "remove" => {
519 let path = expect_str(args.first())?.to_string();
520 if let Err(e) = self.ensure_fs_write_path(&path) {
521 return Ok(err(Value::Str(e)));
522 }
523 let p = std::path::Path::new(&path);
524 let result = if p.is_dir() {
525 std::fs::remove_dir_all(p)
526 } else {
527 std::fs::remove_file(p)
528 };
529 match result {
530 Ok(_) => Ok(ok(Value::Unit)),
531 Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532 }
533 }
534 "copy" => {
535 let src = expect_str(args.first())?.to_string();
536 let dst = expect_str(args.get(1))?.to_string();
537 if let Err(e) = self.ensure_fs_walk_path(&src) {
538 return Ok(err(Value::Str(e)));
539 }
540 if let Err(e) = self.ensure_fs_write_path(&dst) {
541 return Ok(err(Value::Str(e)));
542 }
543 match std::fs::copy(&src, &dst) {
544 Ok(_) => Ok(ok(Value::Unit)),
545 Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546 }
547 }
548 other => Err(format!("unsupported fs.{other}")),
549 }
550 }
551
552 fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557 if self.policy.allow_fs_read.is_empty() {
558 return Ok(());
559 }
560 let p = std::path::Path::new(path);
561 if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562 Ok(())
563 } else {
564 Err(format!("fs path `{path}` outside --allow-fs-read"))
565 }
566 }
567
568 fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571 if self.policy.allow_fs_write.is_empty() {
572 return Ok(());
573 }
574 let p = std::path::Path::new(path);
575 if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576 Ok(())
577 } else {
578 Err(format!("fs path `{path}` outside --allow-fs-write"))
579 }
580 }
581
582 fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586 if self.policy.allow_net_host.is_empty() { return Ok(()); }
587 let host = extract_host(url).unwrap_or("");
588 if self.policy.allow_net_host.iter().any(|h| host == h) {
589 Ok(())
590 } else {
591 Err(format!(
592 "net call to host `{host}` not in --allow-net-host {:?}",
593 self.policy.allow_net_host,
594 ))
595 }
596 }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600 let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601 let host_port = match rest.find('/') {
602 Some(i) => &rest[..i],
603 None => rest,
604 };
605 Some(match host_port.rsplit_once(':') {
606 Some((h, _)) => h,
607 None => host_port,
608 })
609}
610
611impl EffectHandler for DefaultHandler {
612 fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617 let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620 loop {
626 let cur = self.budget_remaining.load(Ordering::SeqCst);
627 if cost > cur {
628 let used = ceiling.saturating_sub(cur);
629 return Err(format!(
630 "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631 }
632 let next = cur - cost;
633 if self.budget_remaining.compare_exchange(cur, next,
636 Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637 return Ok(());
638 }
639 }
640 }
641
642 fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643 if is_pure_call(kind, op) {
647 return call_pure_builtin(kind, op, args);
648 }
649 if kind == "process" {
653 self.ensure_kind_allowed("proc")?;
654 return self.dispatch_process(op, args);
655 }
656 if kind == "log" {
657 let effect_kind = match op {
660 "debug" | "info" | "warn" | "error" => "log",
661 "set_level" | "set_format" => "io",
662 "set_sink" => {
663 self.ensure_kind_allowed("io")?;
664 self.ensure_kind_allowed("fs_write")?;
665 return self.dispatch_log(op, args);
666 }
667 other => return Err(format!("unsupported log.{other}")),
668 };
669 self.ensure_kind_allowed(effect_kind)?;
670 return self.dispatch_log(op, args);
671 }
672 if kind == "fs" {
673 let effect_kind = match op {
674 "exists" | "is_file" | "is_dir" | "stat"
675 | "list_dir" | "walk" | "glob" => "fs_walk",
676 "mkdir_p" | "remove" => "fs_write",
677 "copy" => {
678 self.ensure_kind_allowed("fs_walk")?;
679 self.ensure_kind_allowed("fs_write")?;
680 return self.dispatch_fs(op, args);
681 }
682 other => return Err(format!("unsupported fs.{other}")),
683 };
684 self.ensure_kind_allowed(effect_kind)?;
685 return self.dispatch_fs(op, args);
686 }
687 if kind == "datetime" && op == "now" {
694 self.ensure_kind_allowed("time")?;
695 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
697 if let Ok(secs) = s.trim().parse::<i64>() {
698 return Ok(Value::Int(secs.saturating_mul(1_000_000_000)));
699 }
700 }
701 let now = chrono::Utc::now();
702 let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
703 return Ok(Value::Int(nanos));
704 }
705 if kind == "crypto" && op == "random" {
706 self.ensure_kind_allowed("random")?;
707 let n = expect_int(args.first())?;
708 if !(0..=1_048_576).contains(&n) {
709 return Err("crypto.random: n must be in 0..=1048576".into());
710 }
711 use rand::{rngs::SysRng, TryRng};
712 let mut buf = vec![0u8; n as usize];
713 SysRng.try_fill_bytes(&mut buf)
714 .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
715 return Ok(Value::Bytes(buf));
716 }
717 if kind == "crypto" && op == "random_str_hex" {
722 self.ensure_kind_allowed("random")?;
723 let n = expect_int(args.first())?;
724 if !(0..=1_048_576).contains(&n) {
725 return Err("crypto.random_str_hex: n must be in 0..=1048576".into());
726 }
727 use rand::{rngs::SysRng, TryRng};
728 let mut buf = vec![0u8; n as usize];
729 SysRng.try_fill_bytes(&mut buf)
730 .map_err(|e| format!("crypto.random_str_hex: OS RNG: {e}"))?;
731 return Ok(Value::Str(hex::encode(&buf)));
732 }
733 if kind == "agent" {
746 let effect_kind = match op {
747 "local_complete" => "llm_local",
748 "cloud_complete" => "llm_cloud",
749 "cloud_stream" => "llm_cloud",
750 "send_a2a" => "a2a",
751 "call_mcp" => "mcp",
752 other => return Err(format!("unsupported agent.{other}")),
753 };
754 self.ensure_kind_allowed(effect_kind)?;
755 return match op {
763 "call_mcp" => Ok(self.dispatch_call_mcp(args)),
764 "local_complete" => Ok(dispatch_llm_local(args)),
765 "cloud_complete" => Ok(dispatch_llm_cloud(args)),
766 "cloud_stream" => Ok(self.dispatch_cloud_stream(args)),
767 _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
768 };
769 }
770 if kind == "stream" {
771 self.ensure_kind_allowed("stream")?;
778 return match op {
779 "next" => Ok(self.dispatch_stream_next(args)),
780 "collect" => Ok(self.dispatch_stream_collect(args)),
781 other => Err(format!("unsupported stream.{other}")),
782 };
783 }
784 if kind == "http" && matches!(op, "send" | "get" | "post") {
785 self.ensure_kind_allowed("net")?;
786 return match op {
787 "send" => {
788 let req = expect_record(args.first())?;
789 Ok(http_send_record(self, req))
790 }
791 "get" => {
792 let url = expect_str(args.first())?.to_string();
793 self.ensure_host_allowed(&url)?;
794 Ok(http_send_simple("GET", &url, None, "", None))
795 }
796 "post" => {
797 let url = expect_str(args.first())?.to_string();
798 let body = expect_bytes(args.get(1))?.clone();
799 let content_type = expect_str(args.get(2))?.to_string();
800 self.ensure_host_allowed(&url)?;
801 Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
802 }
803 _ => unreachable!(),
804 };
805 }
806 self.ensure_kind_allowed(kind)?;
807 match (kind, op) {
808 ("io", "print") => {
809 let line = expect_str(args.first())?;
810 self.sink.print_line(line);
811 Ok(Value::Unit)
812 }
813 ("io", "read") => {
814 let path = expect_str(args.first())?.to_string();
815 let resolved = self.resolve_read_path(&path);
816 if !self.policy.allow_fs_read.is_empty()
823 && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
824 {
825 return Err(format!("read of `{path}` outside --allow-fs-read"));
826 }
827 match std::fs::read_to_string(&resolved) {
828 Ok(s) => Ok(ok(Value::Str(s))),
829 Err(e) => Ok(err(Value::Str(format!("{e}")))),
830 }
831 }
832 ("io", "write") => {
833 let path = expect_str(args.first())?.to_string();
834 let contents = expect_str(args.get(1))?.to_string();
835 if !self.policy.allow_fs_write.is_empty() {
837 let p = std::path::Path::new(&path);
838 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
839 return Err(format!("write to `{path}` outside --allow-fs-write"));
840 }
841 }
842 match std::fs::write(&path, contents) {
843 Ok(_) => Ok(ok(Value::Unit)),
844 Err(e) => Ok(err(Value::Str(format!("{e}")))),
845 }
846 }
847 ("time", "now") => {
848 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
850 if let Ok(secs) = s.trim().parse::<i64>() {
851 return Ok(Value::Int(secs));
852 }
853 }
854 let secs = SystemTime::now().duration_since(UNIX_EPOCH)
855 .map_err(|e| format!("time: {e}"))?.as_secs();
856 Ok(Value::Int(secs as i64))
857 }
858 ("time", "now_ms") => {
859 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
864 if let Ok(secs) = s.trim().parse::<i64>() {
865 return Ok(Value::Int(secs.saturating_mul(1000)));
866 }
867 }
868 let ms = SystemTime::now().duration_since(UNIX_EPOCH)
869 .map_err(|e| format!("time: {e}"))?.as_millis();
870 Ok(Value::Int(ms as i64))
871 }
872 ("time", "now_str") => {
873 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
877 if let Ok(secs) = s.trim().parse::<i64>() {
878 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0)
879 .unwrap_or_else(chrono::Utc::now);
880 return Ok(Value::Str(dt.to_rfc3339()));
881 }
882 }
883 Ok(Value::Str(chrono::Utc::now().to_rfc3339()))
884 }
885 ("time", "mono_ns") => {
886 static MONO_START: OnceLock<std::time::Instant> = OnceLock::new();
894 let start = MONO_START.get_or_init(std::time::Instant::now);
895 let dur = std::time::Instant::now().duration_since(*start);
896 Ok(Value::Int(dur.as_nanos() as i64))
897 }
898 ("time", "sleep_ms") => {
899 let n = expect_int(args.first())?;
907 if n > 0 {
908 let ms = (n as u64).min(60_000);
909 std::thread::sleep(std::time::Duration::from_millis(ms));
910 }
911 Ok(Value::Unit)
912 }
913 ("rand", "int_in") => {
914 let lo = expect_int(args.first())?;
916 let hi = expect_int(args.get(1))?;
917 Ok(Value::Int((lo + hi) / 2))
918 }
919 ("env", "get") => {
924 let name = expect_str(args.first())?;
925 Ok(match std::env::var(name) {
926 Ok(v) => Value::Variant {
927 name: "Some".into(),
928 args: vec![Value::Str(v)],
929 },
930 Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
931 })
932 }
933 ("budget", _) => {
934 Ok(Value::Unit)
937 }
938 ("net", "get") => {
939 let url = expect_str(args.first())?.to_string();
940 self.ensure_host_allowed(&url)?;
941 Ok(http_request("GET", &url, None))
942 }
943 ("net", "post") => {
944 let url = expect_str(args.first())?.to_string();
945 let body = expect_str(args.get(1))?.to_string();
946 self.ensure_host_allowed(&url)?;
947 Ok(http_request("POST", &url, Some(&body)))
948 }
949 ("net", "serve") => {
950 let port = match args.first() {
951 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
952 _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
953 };
954 let handler_name = expect_str(args.get(1))?.to_string();
955 let program = self.program.clone()
956 .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
957 let policy = self.policy.clone();
958 serve_http(port, handler_name, program, policy, None)
959 }
960 ("net", "serve_fn") => {
961 let port = match args.first() {
962 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
963 _ => return Err("net.serve_fn(port, handler): port must be Int 0..=65535".into()),
964 };
965 let closure = match args.into_iter().nth(1) {
966 Some(c @ Value::Closure { .. }) => c,
967 _ => return Err("net.serve_fn(port, handler): handler must be a closure".into()),
968 };
969 let program = self.program.clone()
970 .ok_or_else(|| "net.serve_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
971 let policy = self.policy.clone();
972 serve_http_fn(port, closure, program, policy)
973 }
974 ("net", "serve_tls") => {
975 let port = match args.first() {
976 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
977 _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
978 };
979 let cert_path = expect_str(args.get(1))?.to_string();
980 let key_path = expect_str(args.get(2))?.to_string();
981 let handler_name = expect_str(args.get(3))?.to_string();
982 let program = self.program.clone()
983 .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
984 let policy = self.policy.clone();
985 let cert = std::fs::read(&cert_path)
986 .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
987 let key = std::fs::read(&key_path)
988 .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
989 serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
990 }
991 ("net", "serve_ws") => {
992 let port = match args.first() {
993 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
994 _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
995 };
996 let handler_name = expect_str(args.get(1))?.to_string();
997 let program = self.program.clone()
998 .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
999 let policy = self.policy.clone();
1000 let registry = Arc::new(crate::ws::ChatRegistry::default());
1001 crate::ws::serve_ws(port, handler_name, program, policy, registry)
1002 }
1003 ("net", "serve_ws_fn") => {
1004 let port = match args.first() {
1005 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
1006 _ => return Err("net.serve_ws_fn(port, subprotocol, handler): port must be Int 0..=65535".into()),
1007 };
1008 let subprotocol = expect_str(args.get(1))?.to_string();
1009 let closure = match args.into_iter().nth(2) {
1010 Some(c @ Value::Closure { .. }) => c,
1011 _ => return Err("net.serve_ws_fn(port, subprotocol, handler): handler must be a closure".into()),
1012 };
1013 let program = self.program.clone()
1014 .ok_or_else(|| "net.serve_ws_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
1015 let policy = self.policy.clone();
1016 let registry = Arc::new(crate::ws::ChatRegistry::default());
1017 crate::ws::serve_ws_fn(port, subprotocol, closure, program, policy, registry)
1018 }
1019 ("net", "dial_ws") => {
1020 let url = expect_str(args.first())?.to_string();
1022 let subprotocol = expect_str(args.get(1))?.to_string();
1023 let on_open = match args.get(2).cloned() {
1024 Some(c @ Value::Closure { .. }) => c,
1025 _ => return Err(
1026 "net.dial_ws(url, subprotocol, on_open, on_message): on_open must be a closure".into(),
1027 ),
1028 };
1029 let on_message = match args.into_iter().nth(3) {
1030 Some(c @ Value::Closure { .. }) => c,
1031 _ => return Err(
1032 "net.dial_ws(url, subprotocol, on_open, on_message): on_message must be a closure".into(),
1033 ),
1034 };
1035 let program = self.program.clone().ok_or_else(|| {
1036 "net.dial_ws requires a Program reference; use DefaultHandler::with_program".to_string()
1037 })?;
1038 let policy = self.policy.clone();
1039 crate::ws::dial_ws(url, subprotocol, on_open, on_message, program, policy)
1040 }
1041 ("chat", "broadcast") => {
1042 let registry = self.chat_registry.as_ref()
1043 .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
1044 let room = expect_str(args.first())?;
1045 let body = expect_str(args.get(1))?;
1046 crate::ws::chat_broadcast(registry, room, body);
1047 Ok(Value::Unit)
1048 }
1049 ("chat", "send") => {
1050 let registry = self.chat_registry.as_ref()
1051 .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
1052 let conn_id = match args.first() {
1053 Some(Value::Int(n)) if *n >= 0 => *n as u64,
1054 _ => return Err("chat.send: conn_id must be non-negative Int".into()),
1055 };
1056 let body = expect_str(args.get(1))?;
1057 Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
1058 }
1059 ("kv", "open") => {
1060 let path = expect_str(args.first())?.to_string();
1061 if !self.policy.allow_fs_write.is_empty() {
1065 let p = std::path::Path::new(&path);
1066 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1067 return Ok(err(Value::Str(format!(
1068 "kv.open: `{path}` outside --allow-fs-write"))));
1069 }
1070 }
1071 match sled::open(&path) {
1072 Ok(db) => {
1073 let handle = next_kv_handle();
1074 kv_registry().lock().unwrap().insert(handle, db);
1075 Ok(ok(Value::Int(handle as i64)))
1076 }
1077 Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
1078 }
1079 }
1080 ("kv", "close") => {
1081 let h = expect_kv_handle(args.first())?;
1082 kv_registry().lock().unwrap().remove(h);
1083 Ok(Value::Unit)
1084 }
1085 ("kv", "get") => {
1086 let h = expect_kv_handle(args.first())?;
1087 let key = expect_str(args.get(1))?;
1088 let mut reg = kv_registry().lock().unwrap();
1089 let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
1090 match db.get(key.as_bytes()) {
1091 Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
1092 Ok(None) => Ok(none()),
1093 Err(e) => Err(format!("kv.get: {e}")),
1094 }
1095 }
1096 ("kv", "put") => {
1097 let h = expect_kv_handle(args.first())?;
1098 let key = expect_str(args.get(1))?.to_string();
1099 let val = expect_bytes(args.get(2))?.clone();
1100 let mut reg = kv_registry().lock().unwrap();
1101 let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
1102 match db.insert(key.as_bytes(), val) {
1103 Ok(_) => Ok(ok(Value::Unit)),
1104 Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
1105 }
1106 }
1107 ("kv", "delete") => {
1108 let h = expect_kv_handle(args.first())?;
1109 let key = expect_str(args.get(1))?;
1110 let mut reg = kv_registry().lock().unwrap();
1111 let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
1112 match db.remove(key.as_bytes()) {
1113 Ok(_) => Ok(ok(Value::Unit)),
1114 Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
1115 }
1116 }
1117 ("kv", "contains") => {
1118 let h = expect_kv_handle(args.first())?;
1119 let key = expect_str(args.get(1))?;
1120 let mut reg = kv_registry().lock().unwrap();
1121 let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1122 match db.contains_key(key.as_bytes()) {
1123 Ok(present) => Ok(Value::Bool(present)),
1124 Err(e) => Err(format!("kv.contains: {e}")),
1125 }
1126 }
1127 ("kv", "list_prefix") => {
1128 let h = expect_kv_handle(args.first())?;
1129 let prefix = expect_str(args.get(1))?;
1130 let mut reg = kv_registry().lock().unwrap();
1131 let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1132 let mut keys: Vec<Value> = Vec::new();
1133 for kv in db.scan_prefix(prefix.as_bytes()) {
1134 let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1135 let s = String::from_utf8_lossy(&k).to_string();
1136 keys.push(Value::Str(s));
1137 }
1138 Ok(Value::List(keys))
1139 }
1140 ("sql", "open") => {
1141 let path = expect_str(args.first())?.to_string();
1142 if path.starts_with("postgres://") || path.starts_with("postgresql://") {
1143 match postgres::Client::connect(&path, postgres::NoTls) {
1145 Ok(client) => {
1146 let handle = next_sql_handle();
1147 sql_registry().lock().unwrap().insert(handle, SqlConn::Postgres(client));
1148 Ok(ok(Value::Int(handle as i64)))
1149 }
1150 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.open"))),
1151 }
1152 } else {
1153 if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1156 let p = std::path::Path::new(&path);
1157 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1158 return Ok(err(sql_error(
1159 format!("sql.open: `{path}` outside --allow-fs-write"),
1160 None, None,
1161 )));
1162 }
1163 }
1164 match rusqlite::Connection::open(&path) {
1165 Ok(conn) => {
1166 let handle = next_sql_handle();
1167 sql_registry().lock().unwrap().insert(handle, SqlConn::Sqlite(conn));
1168 Ok(ok(Value::Int(handle as i64)))
1169 }
1170 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.open"))),
1171 }
1172 }
1173 }
1174 ("sql", "close") => {
1175 let h = expect_sql_handle(args.first())?;
1176 sql_registry().lock().unwrap().remove(h);
1177 Ok(Value::Unit)
1178 }
1179 ("sql", "exec") => {
1180 let h = expect_sql_handle(args.first())?;
1181 let stmt = expect_str(args.get(1))?.to_string();
1182 let params = expect_sql_params(args.get(2))?;
1183 let arc = sql_registry().lock().unwrap()
1184 .touch_get(h)
1185 .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1186 let mut conn = arc.lock().unwrap();
1187 match &mut *conn {
1188 SqlConn::Sqlite(c) => {
1189 let bound = sqlite_params(¶ms);
1190 let bind: Vec<&dyn rusqlite::ToSql> =
1191 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1192 match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1193 Ok(n) => Ok(ok(Value::Int(n as i64))),
1194 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec"))),
1195 }
1196 }
1197 SqlConn::Postgres(c) => {
1198 let pg = pg_param_refs(¶ms);
1199 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1200 pg.iter().map(|b| b.as_ref()).collect();
1201 match c.execute(stmt.as_str(), &refs) {
1202 Ok(n) => Ok(ok(Value::Int(n as i64))),
1203 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec"))),
1204 }
1205 }
1206 }
1207 }
1208 ("sql", "query") => {
1209 let h = expect_sql_handle(args.first())?;
1210 let stmt_str = expect_str(args.get(1))?.to_string();
1211 let params = expect_sql_params(args.get(2))?;
1212 let arc = sql_registry().lock().unwrap()
1213 .touch_get(h)
1214 .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1215 let mut conn = arc.lock().unwrap();
1216 Ok(match &mut *conn {
1217 SqlConn::Sqlite(c) => sql_run_query_sqlite(c, &stmt_str, ¶ms),
1218 SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, ¶ms),
1219 })
1220 }
1221 ("sql", "query_iter") => {
1227 let h = expect_sql_handle(args.first())?;
1228 let stmt_str = expect_str(args.get(1))?.to_string();
1229 let params = expect_sql_params(args.get(2))?;
1230 let arc = sql_registry().lock().unwrap()
1231 .touch_get(h)
1232 .ok_or_else(|| "sql.query_iter: closed or unknown Db handle".to_string())?;
1233
1234 let (sender, receiver) = std::sync::mpsc::sync_channel::<Result<Value, String>>(
1238 CURSOR_CHANNEL_CAPACITY,
1239 );
1240 let cursor_h = next_cursor_handle();
1241 cursor_registry().lock().unwrap().insert(cursor_h, receiver);
1242
1243 let arc_for_thread = Arc::clone(&arc);
1244 let is_sqlite = matches!(*arc.lock().unwrap(), SqlConn::Sqlite(_));
1250 std::thread::spawn(move || {
1251 if is_sqlite {
1252 sqlite_cursor_producer(arc_for_thread, stmt_str, params, sender);
1253 } else {
1254 pg_cursor_producer(arc_for_thread, stmt_str, params, sender);
1255 }
1256 });
1257
1258 Ok(ok(Value::Variant {
1259 name: "__IterCursor".into(),
1260 args: vec![Value::Int(cursor_h as i64)],
1261 }))
1262 }
1263 ("sql", "cursor_next") => {
1269 let h = match args.first() {
1270 Some(Value::Int(n)) if *n >= 0 => *n as u64,
1271 _ => return Err("sql.cursor_next: expected cursor handle (Int)".into()),
1272 };
1273 let rx_arc = match cursor_registry().lock().unwrap().touch_get(h) {
1274 Some(a) => a,
1275 None => return Ok(Value::Variant { name: "None".into(), args: vec![] }),
1276 };
1277 let recv_result = {
1282 let rx = match rx_arc.lock() {
1283 Ok(g) => g,
1284 Err(p) => p.into_inner(),
1285 };
1286 rx.recv()
1287 };
1288 match recv_result {
1289 Ok(Ok(row)) => Ok(Value::Variant {
1290 name: "Some".into(),
1291 args: vec![row],
1292 }),
1293 Ok(Err(_)) | Err(_) => {
1294 cursor_registry().lock().unwrap().remove(h);
1298 Ok(Value::Variant { name: "None".into(), args: vec![] })
1299 }
1300 }
1301 }
1302 ("sql", "begin") => {
1307 let h = expect_sql_handle(args.first())?;
1308 let arc = sql_registry().lock().unwrap()
1309 .touch_get(h)
1310 .ok_or_else(|| "sql.begin: closed or unknown Db handle".to_string())?;
1311 let mut conn = arc.lock().unwrap();
1312 match &mut *conn {
1313 SqlConn::Sqlite(c) => match c.execute_batch("BEGIN") {
1314 Ok(()) => Ok(ok(Value::Int(h as i64))),
1315 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.begin"))),
1316 },
1317 SqlConn::Postgres(c) => match c.batch_execute("BEGIN") {
1318 Ok(()) => Ok(ok(Value::Int(h as i64))),
1319 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.begin"))),
1320 },
1321 }
1322 }
1323 ("sql", "commit") => {
1324 let h = expect_sql_handle(args.first())?;
1325 let arc = sql_registry().lock().unwrap()
1326 .touch_get(h)
1327 .ok_or_else(|| "sql.commit: closed or unknown SqlTx handle".to_string())?;
1328 let mut conn = arc.lock().unwrap();
1329 match &mut *conn {
1330 SqlConn::Sqlite(c) => match c.execute_batch("COMMIT") {
1331 Ok(()) => Ok(ok(Value::Unit)),
1332 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.commit"))),
1333 },
1334 SqlConn::Postgres(c) => match c.batch_execute("COMMIT") {
1335 Ok(()) => Ok(ok(Value::Unit)),
1336 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.commit"))),
1337 },
1338 }
1339 }
1340 ("sql", "rollback") => {
1341 let h = expect_sql_handle(args.first())?;
1342 let arc = sql_registry().lock().unwrap()
1343 .touch_get(h)
1344 .ok_or_else(|| "sql.rollback: closed or unknown SqlTx handle".to_string())?;
1345 let mut conn = arc.lock().unwrap();
1346 match &mut *conn {
1347 SqlConn::Sqlite(c) => match c.execute_batch("ROLLBACK") {
1348 Ok(()) => Ok(ok(Value::Unit)),
1349 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.rollback"))),
1350 },
1351 SqlConn::Postgres(c) => match c.batch_execute("ROLLBACK") {
1352 Ok(()) => Ok(ok(Value::Unit)),
1353 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.rollback"))),
1354 },
1355 }
1356 }
1357 ("sql", "exec_tx") => {
1358 let h = expect_sql_handle(args.first())?;
1359 let stmt = expect_str(args.get(1))?.to_string();
1360 let params = expect_sql_params(args.get(2))?;
1361 let arc = sql_registry().lock().unwrap()
1362 .touch_get(h)
1363 .ok_or_else(|| "sql.exec_tx: closed or unknown SqlTx handle".to_string())?;
1364 let mut conn = arc.lock().unwrap();
1365 match &mut *conn {
1366 SqlConn::Sqlite(c) => {
1367 let bound = sqlite_params(¶ms);
1368 let bind: Vec<&dyn rusqlite::ToSql> =
1369 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1370 match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1371 Ok(n) => Ok(ok(Value::Int(n as i64))),
1372 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec_tx"))),
1373 }
1374 }
1375 SqlConn::Postgres(c) => {
1376 let pg = pg_param_refs(¶ms);
1377 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1378 pg.iter().map(|b| b.as_ref()).collect();
1379 match c.execute(stmt.as_str(), &refs) {
1380 Ok(n) => Ok(ok(Value::Int(n as i64))),
1381 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec_tx"))),
1382 }
1383 }
1384 }
1385 }
1386 ("sql", "query_tx") => {
1387 let h = expect_sql_handle(args.first())?;
1388 let stmt_str = expect_str(args.get(1))?.to_string();
1389 let params = expect_sql_params(args.get(2))?;
1390 let arc = sql_registry().lock().unwrap()
1391 .touch_get(h)
1392 .ok_or_else(|| "sql.query_tx: closed or unknown SqlTx handle".to_string())?;
1393 let mut conn = arc.lock().unwrap();
1394 Ok(match &mut *conn {
1395 SqlConn::Sqlite(c) => sql_run_query_sqlite(c, &stmt_str, ¶ms),
1396 SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, ¶ms),
1397 })
1398 }
1399 ("sql", "get_str") => Ok(sql_get_col(&args, |v| match v {
1400 Value::Str(s) => Some(Value::Str(s.clone())),
1401 Value::Int(n) => Some(Value::Str(n.to_string())),
1402 _ => None,
1403 })?),
1404 ("sql", "get_int") => Ok(sql_get_col(&args, |v| match v {
1405 Value::Int(n) => Some(Value::Int(*n)),
1406 Value::Float(f) => Some(Value::Int(*f as i64)),
1407 _ => None,
1408 })?),
1409 ("sql", "get_float") => Ok(sql_get_col(&args, |v| match v {
1410 Value::Float(f) => Some(Value::Float(*f)),
1411 Value::Int(n) => Some(Value::Float(*n as f64)),
1412 _ => None,
1413 })?),
1414 ("sql", "get_bool") => Ok(sql_get_col(&args, |v| match v {
1415 Value::Bool(b) => Some(Value::Bool(*b)),
1416 Value::Int(n) => Some(Value::Bool(*n != 0)),
1417 _ => None,
1418 })?),
1419 ("proc", "spawn") => {
1420 let cmd = expect_str(args.first())?.to_string();
1434 let raw_args = match args.get(1) {
1435 Some(Value::List(items)) => items,
1436 Some(other) => return Err(format!(
1437 "proc.spawn: args must be List[Str], got {other:?}")),
1438 None => return Err("proc.spawn: missing args list".into()),
1439 };
1440 let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1441 Value::Str(s) => Ok(s.clone()),
1442 other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1443 }).collect::<Result<Vec<_>, _>>()?;
1444
1445 if !self.policy.allow_proc.is_empty() {
1449 let basename = std::path::Path::new(&cmd)
1450 .file_name()
1451 .and_then(|s| s.to_str())
1452 .unwrap_or(&cmd);
1453 if !self.policy.allow_proc.iter().any(|a| a == basename) {
1454 return Ok(err(Value::Str(format!(
1455 "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1456 self.policy.allow_proc
1457 ))));
1458 }
1459 }
1460
1461 if str_args.len() > 1024 {
1464 return Ok(err(Value::Str(
1465 "proc.spawn: arg-count exceeds 1024".into())));
1466 }
1467 if str_args.iter().any(|a| a.len() > 65_536) {
1468 return Ok(err(Value::Str(
1469 "proc.spawn: per-arg length exceeds 64 KiB".into())));
1470 }
1471
1472 let output = std::process::Command::new(&cmd)
1473 .args(&str_args)
1474 .output();
1475 match output {
1476 Ok(o) => {
1477 let mut rec = indexmap::IndexMap::new();
1478 rec.insert("stdout".into(), Value::Str(
1479 String::from_utf8_lossy(&o.stdout).to_string()));
1480 rec.insert("stderr".into(), Value::Str(
1481 String::from_utf8_lossy(&o.stderr).to_string()));
1482 rec.insert("exit_code".into(), Value::Int(
1483 o.status.code().unwrap_or(-1) as i64));
1484 Ok(ok(Value::Record(rec)))
1485 }
1486 Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1487 }
1488 }
1489 other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1490 }
1491 }
1492
1493 fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1516 let mut fresh = DefaultHandler::new(self.policy.clone());
1517 fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1520 fresh.budget_ceiling = self.budget_ceiling;
1521 fresh.read_root = self.read_root.clone();
1522 fresh.program = self.program.clone();
1523 fresh.chat_registry = self.chat_registry.clone();
1524 fresh.streams = std::sync::Arc::clone(&self.streams);
1529 fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1530 Some(Box::new(fresh))
1531 }
1532}
1533
1534pub struct TlsConfig {
1544 pub cert: Vec<u8>,
1545 pub key: Vec<u8>,
1546}
1547
1548fn serve_http(
1549 port: u16,
1550 handler_name: String,
1551 program: Arc<Program>,
1552 policy: Policy,
1553 tls: Option<TlsConfig>,
1554) -> Result<Value, String> {
1555 match tls {
1556 None => serve_http_plain(port, handler_name, program, policy),
1557 Some(cfg) => serve_http_tls_legacy(port, handler_name, program, policy, cfg),
1558 }
1559}
1560
1561fn serve_http_plain(
1565 port: u16,
1566 handler_name: String,
1567 program: Arc<Program>,
1568 policy: Policy,
1569) -> Result<Value, String> {
1570 use http_body_util::BodyExt as _;
1571 use hyper::server::conn::http1;
1572 use hyper::service::service_fn;
1573 use hyper_util::rt::TokioIo;
1574 use tokio::net::TcpListener as TokioTcpListener;
1575
1576 let rt = tokio::runtime::Builder::new_multi_thread()
1577 .enable_all()
1578 .build()
1579 .map_err(|e| format!("net.serve: tokio runtime: {e}"))?;
1580 rt.block_on(async move {
1581 let listener = TokioTcpListener::bind(("0.0.0.0", port))
1582 .await
1583 .map_err(|e| format!("net.serve bind {port}: {e}"))?;
1584 eprintln!("net.serve: listening on http://0.0.0.0:{port}");
1585 loop {
1586 let (stream, _) = listener
1587 .accept()
1588 .await
1589 .map_err(|e| format!("net.serve accept: {e}"))?;
1590 let io = TokioIo::new(stream);
1591 let program = Arc::clone(&program);
1592 let policy = policy.clone();
1593 let handler_name = handler_name.clone();
1594 tokio::spawn(async move {
1595 let program2 = Arc::clone(&program);
1596 let policy2 = policy.clone();
1597 let handler_name2 = handler_name.clone();
1598 let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1599 let program = Arc::clone(&program2);
1600 let policy = policy2.clone();
1601 let handler_name = handler_name2.clone();
1602 async move {
1603 let (parts, body) = req.into_parts();
1604 let body_bytes = body
1605 .collect()
1606 .await
1607 .map(|c| c.to_bytes())
1608 .unwrap_or_default();
1609 let result = tokio::task::spawn_blocking(move || {
1610 let lex_req = build_request_value_parts(&parts, &body_bytes);
1611 let handler = DefaultHandler::new(policy)
1612 .with_program(Arc::clone(&program));
1613 let mut vm = Vm::with_handler(&program, Box::new(handler));
1614 vm.call(&handler_name, vec![lex_req])
1615 })
1616 .await;
1617 Ok::<_, std::convert::Infallible>(match result {
1618 Ok(Ok(resp)) => build_hyper_response(&resp),
1619 Ok(Err(e)) => error_response(500, &format!("internal error: {e}")),
1620 Err(e) => error_response(500, &format!("task panicked: {e}")),
1621 })
1622 }
1623 });
1624 if let Err(e) = http1::Builder::new().serve_connection(io, svc).await {
1625 eprintln!("net.serve: connection error: {e}");
1626 }
1627 });
1628 }
1629 })
1630}
1631
1632fn serve_http_tls_legacy(
1634 port: u16,
1635 handler_name: String,
1636 program: Arc<Program>,
1637 policy: Policy,
1638 cfg: TlsConfig,
1639) -> Result<Value, String> {
1640 let ssl = tiny_http::SslConfig {
1641 certificate: cfg.cert,
1642 private_key: cfg.key,
1643 };
1644 let server = tiny_http::Server::https(("0.0.0.0", port), ssl)
1645 .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?;
1646 eprintln!("net.serve: listening on https://0.0.0.0:{port}");
1647 for req in server.incoming_requests() {
1648 let program = Arc::clone(&program);
1649 let policy = policy.clone();
1650 let handler_name = handler_name.clone();
1651 std::thread::spawn(move || handle_request_tls(req, program, policy, handler_name));
1652 }
1653 Ok(Value::Unit)
1654}
1655
1656fn handle_request_tls(
1657 mut req: tiny_http::Request,
1658 program: Arc<Program>,
1659 policy: Policy,
1660 handler_name: String,
1661) {
1662 let lex_req = build_request_value_tiny(&mut req);
1663 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1664 let mut vm = Vm::with_handler(&program, Box::new(handler));
1665 match vm.call(&handler_name, vec![lex_req]) {
1666 Ok(resp) => {
1667 let (status, body, headers) = unpack_response(&resp);
1668 respond_with_body_tls(req, status, body, headers);
1669 }
1670 Err(e) => {
1671 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1672 .with_status_code(500);
1673 let _ = req.respond(response);
1674 }
1675 }
1676}
1677
1678fn serve_http_fn(
1680 port: u16,
1681 closure: Value,
1682 program: Arc<Program>,
1683 policy: Policy,
1684) -> Result<Value, String> {
1685 use http_body_util::BodyExt as _;
1686 use hyper::server::conn::http1;
1687 use hyper::service::service_fn;
1688 use hyper_util::rt::TokioIo;
1689 use tokio::net::TcpListener as TokioTcpListener;
1690
1691 let rt = tokio::runtime::Builder::new_multi_thread()
1692 .enable_all()
1693 .build()
1694 .map_err(|e| format!("net.serve_fn: tokio runtime: {e}"))?;
1695 rt.block_on(async move {
1696 let listener = TokioTcpListener::bind(("0.0.0.0", port))
1697 .await
1698 .map_err(|e| format!("net.serve_fn bind {port}: {e}"))?;
1699 eprintln!("net.serve_fn: listening on http://0.0.0.0:{port}");
1700 loop {
1701 let (stream, _) = listener
1702 .accept()
1703 .await
1704 .map_err(|e| format!("net.serve_fn accept: {e}"))?;
1705 let io = TokioIo::new(stream);
1706 let program = Arc::clone(&program);
1707 let policy = policy.clone();
1708 let closure = closure.clone();
1709 tokio::spawn(async move {
1710 let program2 = Arc::clone(&program);
1711 let policy2 = policy.clone();
1712 let closure2 = closure.clone();
1713 let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1714 let program = Arc::clone(&program2);
1715 let policy = policy2.clone();
1716 let closure = closure2.clone();
1717 async move {
1718 let (parts, body) = req.into_parts();
1719 let body_bytes = body
1720 .collect()
1721 .await
1722 .map(|c| c.to_bytes())
1723 .unwrap_or_default();
1724 let result = tokio::task::spawn_blocking(move || {
1725 let lex_req = build_request_value_parts(&parts, &body_bytes);
1726 let handler = DefaultHandler::new(policy)
1727 .with_program(Arc::clone(&program));
1728 let mut vm = Vm::with_handler(&program, Box::new(handler));
1729 vm.invoke_closure_value(closure, vec![lex_req])
1730 })
1731 .await;
1732 Ok::<_, std::convert::Infallible>(match result {
1733 Ok(Ok(resp)) => build_hyper_response(&resp),
1734 Ok(Err(e)) => error_response(500, &format!("internal error: {e}")),
1735 Err(e) => error_response(500, &format!("task panicked: {e}")),
1736 })
1737 }
1738 });
1739 if let Err(e) = http1::Builder::new().serve_connection(io, svc).await {
1740 eprintln!("net.serve_fn: connection error: {e}");
1741 }
1742 });
1743 }
1744 })
1745}
1746
1747fn build_request_value_parts(
1749 parts: &hyper::http::request::Parts,
1750 body: &bytes::Bytes,
1751) -> Value {
1752 let method = parts.method.as_str().to_string();
1753 let uri = parts.uri.to_string();
1754 let (path, query) = match uri.split_once('?') {
1755 Some((p, q)) => (p.to_string(), q.to_string()),
1756 None => (uri, String::new()),
1757 };
1758 let mut headers_map = std::collections::BTreeMap::new();
1759 for (name, val) in &parts.headers {
1760 if let Ok(v) = val.to_str() {
1761 headers_map.insert(
1762 lex_bytecode::MapKey::Str(name.as_str().to_ascii_lowercase()),
1763 Value::Str(v.to_string()),
1764 );
1765 }
1766 }
1767 let body_str = String::from_utf8_lossy(body).into_owned();
1768 let mut rec = indexmap::IndexMap::new();
1769 rec.insert("method".into(), Value::Str(method));
1770 rec.insert("path".into(), Value::Str(path));
1771 rec.insert("query".into(), Value::Str(query));
1772 rec.insert("body".into(), Value::Str(body_str));
1773 rec.insert("headers".into(), Value::Map(headers_map));
1774 Value::Record(rec)
1775}
1776
1777fn build_request_value_tiny(req: &mut tiny_http::Request) -> Value {
1779 let method = format!("{:?}", req.method()).to_uppercase();
1780 let url = req.url().to_string();
1781 let (path, query) = match url.split_once('?') {
1782 Some((p, q)) => (p.to_string(), q.to_string()),
1783 None => (url, String::new()),
1784 };
1785 let mut headers_map = std::collections::BTreeMap::new();
1786 for h in req.headers() {
1787 headers_map.insert(
1788 lex_bytecode::MapKey::Str(h.field.as_str().as_str().to_ascii_lowercase()),
1789 Value::Str(h.value.as_str().to_string()),
1790 );
1791 }
1792 let mut body = String::new();
1793 let _ = req.as_reader().read_to_string(&mut body);
1794 let mut rec = indexmap::IndexMap::new();
1795 rec.insert("method".into(), Value::Str(method));
1796 rec.insert("path".into(), Value::Str(path));
1797 rec.insert("query".into(), Value::Str(query));
1798 rec.insert("body".into(), Value::Str(body));
1799 rec.insert("headers".into(), Value::Map(headers_map));
1800 Value::Record(rec)
1801}
1802
1803fn unpack_response(v: &Value) -> (u16, ResponseBodyOut, Vec<(String, String)>) {
1804 if let Value::Record(rec) = v {
1805 let status = rec.get("status").and_then(|s| match s {
1806 Value::Int(n) => Some(*n as u16),
1807 _ => None,
1808 }).unwrap_or(200);
1809 let body = match rec.get("body") {
1810 Some(Value::Variant { name, args }) => match (name.as_str(), args.as_slice()) {
1812 ("BodyStr", [Value::Str(s)]) => ResponseBodyOut::Str(s.clone()),
1813 ("BodyStream", [iter_v]) => ResponseBodyOut::TextChunks(drain_iter_str(iter_v)),
1814 ("BodyBytes", [iter_v]) => ResponseBodyOut::BytesChunks(drain_iter_bytes(iter_v)),
1815 _ => ResponseBodyOut::Str(String::new()),
1816 },
1817 Some(Value::Str(s)) => ResponseBodyOut::Str(s.clone()),
1823 _ => ResponseBodyOut::Str(String::new()),
1824 };
1825 let headers: Vec<(String, String)> = if let Some(Value::Map(hmap)) = rec.get("headers") {
1826 hmap.iter().filter_map(|(k, v)| {
1827 if let (lex_bytecode::MapKey::Str(name), Value::Str(val)) = (k, v) {
1828 Some((name.clone(), val.clone()))
1829 } else {
1830 None
1831 }
1832 }).collect()
1833 } else {
1834 vec![]
1835 };
1836 return (status, body, headers);
1837 }
1838 (
1839 500,
1840 ResponseBodyOut::Str(format!("handler returned non-record: {v:?}")),
1841 vec![],
1842 )
1843}
1844
1845type HyperRespBody =
1846 http_body_util::combinators::BoxBody<bytes::Bytes, std::convert::Infallible>;
1847
1848fn build_hyper_response(v: &Value) -> hyper::Response<HyperRespBody> {
1853 use http_body_util::BodyExt as _;
1854 let (status, body, headers) = unpack_response(v);
1855 let boxed_body: HyperRespBody = match body {
1856 ResponseBodyOut::Str(s) => {
1857 http_body_util::Full::new(bytes::Bytes::from(s.into_bytes())).boxed()
1858 }
1859 ResponseBodyOut::TextChunks(chunks) | ResponseBodyOut::BytesChunks(chunks) => {
1860 HyperChunkedBody::from(chunks).boxed()
1861 }
1862 };
1863 let mut builder = hyper::Response::builder().status(status);
1864 for (name, val) in headers {
1865 builder = builder.header(name, val);
1866 }
1867 builder
1868 .body(boxed_body)
1869 .unwrap_or_else(|_| error_response(500, "response build error"))
1870}
1871
1872fn error_response(status: u16, msg: &str) -> hyper::Response<HyperRespBody> {
1873 use http_body_util::BodyExt as _;
1874 hyper::Response::builder()
1875 .status(status)
1876 .body(
1877 http_body_util::Full::new(bytes::Bytes::from(msg.to_owned()))
1878 .boxed(),
1879 )
1880 .unwrap_or_else(|_| {
1881 use http_body_util::BodyExt as _;
1882 hyper::Response::new(http_body_util::Empty::new().map_err(|e| match e {}).boxed())
1883 })
1884}
1885
1886struct HyperChunkedBody {
1889 chunks: std::collections::VecDeque<Vec<u8>>,
1890}
1891
1892impl From<Vec<Vec<u8>>> for HyperChunkedBody {
1893 fn from(chunks: Vec<Vec<u8>>) -> Self {
1894 Self {
1895 chunks: chunks.into_iter().filter(|c| !c.is_empty()).collect(),
1896 }
1897 }
1898}
1899
1900impl hyper::body::Body for HyperChunkedBody {
1901 type Data = bytes::Bytes;
1902 type Error = std::convert::Infallible;
1903
1904 fn poll_frame(
1905 mut self: std::pin::Pin<&mut Self>,
1906 _cx: &mut std::task::Context<'_>,
1907 ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
1908 match self.chunks.pop_front() {
1909 Some(chunk) => std::task::Poll::Ready(Some(Ok(hyper::body::Frame::data(
1910 bytes::Bytes::from(chunk),
1911 )))),
1912 None => std::task::Poll::Ready(None),
1913 }
1914 }
1915}
1916
1917fn respond_with_body_tls(
1921 req: tiny_http::Request,
1922 status: u16,
1923 body: ResponseBodyOut,
1924 headers: Vec<(String, String)>,
1925) {
1926 let tiny_headers: Vec<tiny_http::Header> = headers
1927 .into_iter()
1928 .filter_map(|(name, val)| format!("{name}: {val}").parse::<tiny_http::Header>().ok())
1929 .collect();
1930 match body {
1931 ResponseBodyOut::Str(s) => {
1932 let mut response = tiny_http::Response::from_string(s).with_status_code(status);
1933 for h in tiny_headers {
1934 response.add_header(h);
1935 }
1936 let _ = req.respond(response);
1937 }
1938 ResponseBodyOut::TextChunks(chunks) | ResponseBodyOut::BytesChunks(chunks) => {
1939 let reader = ChunkReader::new(chunks);
1940 let response = tiny_http::Response::new(
1941 tiny_http::StatusCode(status),
1942 tiny_headers,
1943 reader,
1944 None,
1945 None,
1946 );
1947 let _ = req.respond(response);
1948 }
1949 }
1950}
1951
1952enum ResponseBodyOut {
1957 Str(String),
1958 TextChunks(Vec<Vec<u8>>),
1962 BytesChunks(Vec<Vec<u8>>),
1965}
1966
1967fn drain_iter_str(v: &Value) -> Vec<Vec<u8>> {
1980 match v {
1981 Value::Variant { name, args }
1982 if name == "__IterEager" && args.len() == 2 =>
1983 {
1984 if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
1985 items.iter().skip(*idx as usize).filter_map(|item| {
1986 if let Value::Str(s) = item { Some(s.as_bytes().to_vec()) } else { None }
1987 }).collect()
1988 } else {
1989 Vec::new()
1990 }
1991 }
1992 _ => Vec::new(),
1993 }
1994}
1995
1996fn drain_iter_bytes(v: &Value) -> Vec<Vec<u8>> {
2000 match v {
2001 Value::Variant { name, args }
2002 if name == "__IterEager" && args.len() == 2 =>
2003 {
2004 if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
2005 items.iter().skip(*idx as usize).filter_map(|item| {
2006 if let Value::List(ints) = item {
2007 Some(ints.iter().filter_map(|i| match i {
2008 Value::Int(n) => Some((*n & 0xff) as u8),
2009 _ => None,
2010 }).collect::<Vec<u8>>())
2011 } else {
2012 None
2013 }
2014 }).collect()
2015 } else {
2016 Vec::new()
2017 }
2018 }
2019 _ => Vec::new(),
2020 }
2021}
2022
2023struct ChunkReader {
2029 chunks: std::collections::VecDeque<Vec<u8>>,
2030 cursor: usize,
2031}
2032
2033impl ChunkReader {
2034 fn new(chunks: Vec<Vec<u8>>) -> Self {
2035 Self {
2036 chunks: chunks.into_iter().filter(|c| !c.is_empty()).collect(),
2037 cursor: 0,
2038 }
2039 }
2040}
2041
2042impl std::io::Read for ChunkReader {
2043 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
2044 loop {
2045 let Some(front) = self.chunks.front() else {
2046 return Ok(0);
2047 };
2048 let remaining = &front[self.cursor..];
2049 if remaining.is_empty() {
2050 self.chunks.pop_front();
2051 self.cursor = 0;
2052 continue;
2053 }
2054 let n = remaining.len().min(buf.len());
2055 buf[..n].copy_from_slice(&remaining[..n]);
2056 self.cursor += n;
2057 if self.cursor >= front.len() {
2058 self.chunks.pop_front();
2059 self.cursor = 0;
2060 }
2061 return Ok(n);
2062 }
2063 }
2064}
2065
2066fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
2072 use std::time::Duration;
2073 let agent: ureq::Agent = ureq::Agent::config_builder()
2078 .timeout_connect(Some(Duration::from_secs(10)))
2079 .timeout_recv_body(Some(Duration::from_secs(30)))
2080 .timeout_send_body(Some(Duration::from_secs(10)))
2081 .http_status_as_error(false)
2082 .build()
2083 .into();
2084 let resp = match (method, body) {
2085 ("GET", _) => agent.get(url).call(),
2086 ("POST", Some(b)) => agent.post(url).send(b),
2087 ("POST", None) => agent.post(url).send(""),
2088 (m, _) => return err_value(format!("unsupported method: {m}")),
2089 };
2090 match resp {
2091 Ok(mut r) => {
2092 let status = r.status().as_u16();
2093 let body = r.body_mut().read_to_string().unwrap_or_default();
2094 if (200..300).contains(&status) {
2095 Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
2096 } else {
2097 err_value(format!("status {status}: {body}"))
2098 }
2099 }
2100 Err(e) => err_value(format!("transport: {e}")),
2101 }
2102}
2103
2104fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
2109 use std::time::Duration;
2110 let mut b = ureq::Agent::config_builder()
2111 .timeout_connect(Some(Duration::from_secs(10)))
2112 .timeout_recv_body(Some(Duration::from_secs(30)))
2113 .timeout_send_body(Some(Duration::from_secs(10)))
2114 .http_status_as_error(false);
2115 if let Some(ms) = timeout_ms {
2116 let d = Duration::from_millis(ms);
2117 b = b.timeout_global(Some(d));
2118 }
2119 b.build().into()
2120}
2121
2122fn http_error_value(e: ureq::Error) -> Value {
2126 let (ctor, payload): (&str, Option<String>) = match &e {
2127 ureq::Error::Timeout(_) => ("TimeoutError", None),
2128 ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
2129 ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
2130 ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
2131 _ => ("NetworkError", Some(format!("{e}"))),
2132 };
2133 let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
2134 let inner = Value::Variant { name: ctor.into(), args };
2135 Value::Variant { name: "Err".into(), args: vec![inner] }
2136}
2137
2138fn http_decode_err(msg: String) -> Value {
2139 let inner = Value::Variant {
2140 name: "DecodeError".into(),
2141 args: vec![Value::Str(msg)],
2142 };
2143 Value::Variant { name: "Err".into(), args: vec![inner] }
2144}
2145
2146fn http_send_simple(
2151 method: &str,
2152 url: &str,
2153 body: Option<Vec<u8>>,
2154 content_type: &str,
2155 timeout_ms: Option<u64>,
2156) -> Value {
2157 http_send_full(method, url, body, content_type, &[], timeout_ms)
2158}
2159
2160fn http_send_full(
2161 method: &str,
2162 url: &str,
2163 body: Option<Vec<u8>>,
2164 content_type: &str,
2165 headers: &[(String, String)],
2166 timeout_ms: Option<u64>,
2167) -> Value {
2168 let agent = http_agent(timeout_ms);
2169 let resp = match method {
2170 "GET" => {
2171 let mut req = agent.get(url);
2172 if !content_type.is_empty() { req = req.header("content-type", content_type); }
2173 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
2174 req.call()
2175 }
2176 "POST" => {
2177 let body = body.unwrap_or_default();
2178 let mut req = agent.post(url);
2179 if !content_type.is_empty() { req = req.header("content-type", content_type); }
2180 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
2181 req.send(&body[..])
2182 }
2183 m => {
2184 return http_decode_err(format!("unsupported method: {m}"));
2188 }
2189 };
2190 match resp {
2191 Ok(mut r) => {
2192 let status = r.status().as_u16() as i64;
2193 let headers_map = collect_response_headers(r.headers());
2194 let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
2195 Ok(b) => b,
2196 Err(e) => return http_decode_err(format!("body read: {e}")),
2197 };
2198 let mut rec = indexmap::IndexMap::new();
2199 rec.insert("status".into(), Value::Int(status));
2200 rec.insert("headers".into(), Value::Map(headers_map));
2201 rec.insert("body".into(), Value::Bytes(body_bytes));
2202 Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
2203 }
2204 Err(e) => http_error_value(e),
2205 }
2206}
2207
2208fn collect_response_headers(
2209 headers: &ureq::http::HeaderMap,
2210) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
2211 let mut out = std::collections::BTreeMap::new();
2212 for (name, value) in headers.iter() {
2213 let v = value.to_str().unwrap_or("").to_string();
2214 out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
2215 }
2216 out
2217}
2218
2219fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
2223 let method = match req.get("method") {
2224 Some(Value::Str(s)) => s.clone(),
2225 _ => return http_decode_err("HttpRequest.method must be Str".into()),
2226 };
2227 let url = match req.get("url") {
2228 Some(Value::Str(s)) => s.clone(),
2229 _ => return http_decode_err("HttpRequest.url must be Str".into()),
2230 };
2231 if let Err(e) = handler.ensure_host_allowed(&url) {
2232 return http_decode_err(e);
2233 }
2234 let body = match req.get("body") {
2235 Some(Value::Variant { name, args }) if name == "None" => None,
2236 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2237 [Value::Bytes(b)] => Some(b.clone()),
2238 _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
2239 },
2240 _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
2241 };
2242 let timeout_ms = match req.get("timeout_ms") {
2243 Some(Value::Variant { name, .. }) if name == "None" => None,
2244 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2245 [Value::Int(n)] if *n >= 0 => Some(*n as u64),
2246 _ => return http_decode_err(
2247 "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
2248 },
2249 _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
2250 };
2251 let headers: Vec<(String, String)> = match req.get("headers") {
2252 Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
2253 let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
2254 let vv = match v { Value::Str(s) => s.clone(), _ => return None };
2255 Some((kk, vv))
2256 }).collect(),
2257 _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
2258 };
2259 http_send_full(&method, &url, body, "", &headers, timeout_ms)
2260}
2261
2262fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
2263 match v {
2264 Some(Value::Record(r)) => Ok(r),
2265 Some(other) => Err(format!("expected Record, got {other:?}")),
2266 None => Err("missing Record argument".into()),
2267 }
2268}
2269
2270fn err_value(msg: String) -> Value {
2271 Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
2272}
2273
2274fn expect_str(v: Option<&Value>) -> Result<&str, String> {
2275 match v {
2276 Some(Value::Str(s)) => Ok(s),
2277 Some(other) => Err(format!("expected Str arg, got {other:?}")),
2278 None => Err("missing argument".into()),
2279 }
2280}
2281
2282fn expect_int(v: Option<&Value>) -> Result<i64, String> {
2283 match v {
2284 Some(Value::Int(n)) => Ok(*n),
2285 Some(other) => Err(format!("expected Int arg, got {other:?}")),
2286 None => Err("missing argument".into()),
2287 }
2288}
2289
2290fn ok(v: Value) -> Value {
2291 Value::Variant { name: "Ok".into(), args: vec![v] }
2292}
2293fn err(v: Value) -> Value {
2294 Value::Variant { name: "Err".into(), args: vec![v] }
2295}
2296
2297fn sql_error(message: impl Into<String>, code: Option<String>, detail: Option<String>) -> Value {
2301 let some = |s: String| Value::Variant { name: "Some".into(), args: vec![Value::Str(s)] };
2302 let none = || Value::Variant { name: "None".into(), args: vec![] };
2303 let mut rec = indexmap::IndexMap::new();
2304 rec.insert("message".into(), Value::Str(message.into()));
2305 rec.insert("code".into(), match code {
2306 Some(c) => some(c),
2307 None => none(),
2308 });
2309 rec.insert("detail".into(), match detail {
2310 Some(d) => some(d),
2311 None => none(),
2312 });
2313 Value::Record(rec)
2314}
2315
2316fn sqlite_err_to_sql_error(e: rusqlite::Error, op: &str) -> Value {
2326 let message = format!("{op}: {e}");
2327 match &e {
2328 rusqlite::Error::SqliteFailure(ffi, detail_opt) => {
2329 sql_error(
2330 message,
2331 Some(sqlite_extended_code_name(ffi.extended_code)),
2332 detail_opt.clone(),
2333 )
2334 }
2335 rusqlite::Error::SqlInputError { error, msg, .. } => {
2336 sql_error(
2337 message,
2338 Some(sqlite_extended_code_name(error.extended_code)),
2339 Some(msg.clone()),
2340 )
2341 }
2342 _ => sql_error(message, None, None),
2343 }
2344}
2345
2346fn sqlite_extended_code_name(code: i32) -> String {
2352 use rusqlite::ffi::*;
2353 let s = match code {
2354 SQLITE_BUSY => "SQLITE_BUSY",
2355 SQLITE_LOCKED => "SQLITE_LOCKED",
2356 SQLITE_READONLY => "SQLITE_READONLY",
2357 SQLITE_IOERR => "SQLITE_IOERR",
2358 SQLITE_CORRUPT => "SQLITE_CORRUPT",
2359 SQLITE_NOTFOUND => "SQLITE_NOTFOUND",
2360 SQLITE_FULL => "SQLITE_FULL",
2361 SQLITE_CANTOPEN => "SQLITE_CANTOPEN",
2362 SQLITE_PROTOCOL => "SQLITE_PROTOCOL",
2363 SQLITE_SCHEMA => "SQLITE_SCHEMA",
2364 SQLITE_TOOBIG => "SQLITE_TOOBIG",
2365 SQLITE_CONSTRAINT => "SQLITE_CONSTRAINT",
2366 SQLITE_CONSTRAINT_CHECK => "SQLITE_CONSTRAINT_CHECK",
2367 SQLITE_CONSTRAINT_FOREIGNKEY => "SQLITE_CONSTRAINT_FOREIGNKEY",
2368 SQLITE_CONSTRAINT_NOTNULL => "SQLITE_CONSTRAINT_NOTNULL",
2369 SQLITE_CONSTRAINT_PRIMARYKEY => "SQLITE_CONSTRAINT_PRIMARYKEY",
2370 SQLITE_CONSTRAINT_TRIGGER => "SQLITE_CONSTRAINT_TRIGGER",
2371 SQLITE_CONSTRAINT_UNIQUE => "SQLITE_CONSTRAINT_UNIQUE",
2372 SQLITE_CONSTRAINT_VTAB => "SQLITE_CONSTRAINT_VTAB",
2373 SQLITE_CONSTRAINT_ROWID => "SQLITE_CONSTRAINT_ROWID",
2374 SQLITE_MISMATCH => "SQLITE_MISMATCH",
2375 SQLITE_RANGE => "SQLITE_RANGE",
2376 SQLITE_NOTADB => "SQLITE_NOTADB",
2377 SQLITE_AUTH => "SQLITE_AUTH",
2378 _ => return format!("SQLITE_ERROR_{code}"),
2379 };
2380 s.to_string()
2381}
2382
2383fn pg_err_to_sql_error(e: postgres::Error, op: &str) -> Value {
2387 let message = format!("{op}: {e}");
2388 let code = e.as_db_error().map(|db| db.code().code().to_string());
2389 let detail = e.as_db_error().and_then(|db| db.detail().map(|s| s.to_string()));
2390 sql_error(message, code, detail)
2391}
2392
2393impl DefaultHandler {
2394 fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
2400 let server = match args.first() {
2401 Some(Value::Str(s)) => s.clone(),
2402 _ => return err(Value::Str(
2403 "agent.call_mcp(server, tool, args_json): server must be Str".into())),
2404 };
2405 let tool = match args.get(1) {
2406 Some(Value::Str(s)) => s.clone(),
2407 _ => return err(Value::Str(
2408 "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
2409 };
2410 let args_json = match args.get(2) {
2411 Some(Value::Str(s)) => s.clone(),
2412 _ => return err(Value::Str(
2413 "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
2414 };
2415 let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
2416 Ok(v) => v,
2417 Err(e) => return err(Value::Str(format!(
2418 "agent.call_mcp: args_json is not valid JSON: {e}"))),
2419 };
2420 match self.mcp_clients.call(&server, &tool, parsed) {
2421 Ok(result) => ok(Value::Str(
2422 serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
2423 Err(e) => err(Value::Str(e)),
2424 }
2425 }
2426
2427 fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
2433 let _prompt = match args.first() {
2434 Some(Value::Str(s)) => s.clone(),
2435 _ => return err(Value::Str(
2436 "agent.cloud_stream(prompt): prompt must be Str".into())),
2437 };
2438 let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
2439 Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
2440 Err(_) => return err(Value::Str(
2441 "agent.cloud_stream: live streaming not yet implemented; \
2442 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
2443 };
2444 let handle = self.register_stream(chunks.into_iter());
2445 ok(stream_handle_value(handle))
2446 }
2447
2448 fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
2454 let handle = match args.first().and_then(stream_handle_id) {
2455 Some(h) => h,
2456 None => return Value::Variant { name: "None".into(), args: vec![] },
2457 };
2458 let mut streams = match self.streams.lock() {
2459 Ok(g) => g,
2460 Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
2461 };
2462 match streams.get_mut(&handle).and_then(|it| it.next()) {
2463 Some(chunk) => some(Value::Str(chunk)),
2464 None => {
2465 streams.remove(&handle);
2466 Value::Variant { name: "None".into(), args: vec![] }
2467 }
2468 }
2469 }
2470
2471 fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
2476 let handle = match args.first().and_then(stream_handle_id) {
2477 Some(h) => h,
2478 None => return Value::List(Vec::new()),
2479 };
2480 let mut iter = {
2481 let mut streams = match self.streams.lock() {
2482 Ok(g) => g,
2483 Err(_) => return Value::List(Vec::new()),
2484 };
2485 match streams.remove(&handle) {
2486 Some(it) => it,
2487 None => return Value::List(Vec::new()),
2488 }
2489 };
2490 let mut out: Vec<Value> = Vec::new();
2491 for chunk in iter.by_ref() {
2492 out.push(Value::Str(chunk));
2493 }
2494 Value::List(out)
2495 }
2496
2497 fn register_stream<I>(&self, iter: I) -> String
2501 where
2502 I: Iterator<Item = String> + Send + 'static,
2503 {
2504 let id = self
2505 .next_stream_id
2506 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2507 let handle = format!("stream_{id}");
2508 if let Ok(mut streams) = self.streams.lock() {
2509 streams.insert(handle.clone(), Box::new(iter));
2510 }
2511 handle
2512 }
2513}
2514
2515fn stream_handle_value(handle: String) -> Value {
2520 Value::Variant {
2521 name: "__StreamHandle".into(),
2522 args: vec![Value::Str(handle)],
2523 }
2524}
2525
2526fn stream_handle_id(v: &Value) -> Option<String> {
2530 match v {
2531 Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
2532 Some(Value::Str(h)) => Some(h.clone()),
2533 _ => None,
2534 },
2535 _ => None,
2536 }
2537}
2538
2539fn dispatch_llm_local(args: Vec<Value>) -> Value {
2544 let prompt = match args.first() {
2545 Some(Value::Str(s)) => s.clone(),
2546 _ => return err(Value::Str(
2547 "agent.local_complete(prompt): prompt must be Str".into())),
2548 };
2549 match crate::llm::local_complete(&prompt) {
2550 Ok(text) => ok(Value::Str(text)),
2551 Err(e) => err(Value::Str(e)),
2552 }
2553}
2554
2555fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
2562 let prompt = match args.first() {
2563 Some(Value::Str(s)) => s.clone(),
2564 _ => return err(Value::Str(
2565 "agent.cloud_complete(prompt): prompt must be Str".into())),
2566 };
2567 match crate::llm::cloud_complete(&prompt) {
2568 Ok(text) => ok(Value::Str(text)),
2569 Err(e) => err(Value::Str(e)),
2570 }
2571}
2572
2573fn some(v: Value) -> Value {
2574 Value::Variant { name: "Some".into(), args: vec![v] }
2575}
2576fn none() -> Value {
2577 Value::Variant { name: "None".into(), args: vec![] }
2578}
2579
2580fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
2581 match v {
2582 Some(Value::Bytes(b)) => Ok(b),
2583 Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
2584 None => Err("missing argument".into()),
2585 }
2586}
2587
2588fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
2589 match v {
2590 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2591 Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
2592 None => Err("missing Kv argument".into()),
2593 }
2594}
2595
2596fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
2597 match v {
2598 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2599 Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
2600 None => Err("missing Db argument".into()),
2601 }
2602}
2603
2604#[allow(dead_code)]
2605fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
2606 match v {
2607 Some(Value::List(items)) => items.iter().map(|x| match x {
2608 Value::Str(s) => Ok(s.clone()),
2609 other => Err(format!("expected List[Str] element, got {other:?}")),
2610 }).collect(),
2611 Some(other) => Err(format!("expected List[Str], got {other:?}")),
2612 None => Err("missing List[Str] argument".into()),
2613 }
2614}
2615
2616fn expect_sql_params(v: Option<&Value>) -> Result<Vec<SqlParamValue>, String> {
2619 let items = match v {
2620 Some(Value::List(xs)) => xs,
2621 Some(other) => return Err(format!("expected List[SqlParam], got {other:?}")),
2622 None => return Err("missing params argument".into()),
2623 };
2624 items.iter().map(|item| {
2625 match item {
2626 Value::Variant { name, args } => match name.as_str() {
2627 "PStr" => match args.first() {
2628 Some(Value::Str(s)) => Ok(SqlParamValue::Text(s.clone())),
2629 _ => Err("PStr requires a Str argument".into()),
2630 },
2631 "PInt" => match args.first() {
2632 Some(Value::Int(n)) => Ok(SqlParamValue::Integer(*n)),
2633 _ => Err("PInt requires an Int argument".into()),
2634 },
2635 "PFloat" => match args.first() {
2636 Some(Value::Float(f)) => Ok(SqlParamValue::Real(*f)),
2637 _ => Err("PFloat requires a Float argument".into()),
2638 },
2639 "PBool" => match args.first() {
2640 Some(Value::Bool(b)) => Ok(SqlParamValue::Bool(*b)),
2641 _ => Err("PBool requires a Bool argument".into()),
2642 },
2643 "PNull" => Ok(SqlParamValue::Null),
2644 other => Err(format!("unknown SqlParam constructor `{other}`")),
2645 },
2646 Value::Str(s) => Ok(SqlParamValue::Text(s.clone())),
2648 other => Err(format!("expected SqlParam variant, got {other:?}")),
2649 }
2650 }).collect()
2651}
2652
2653fn sqlite_params(params: &[SqlParamValue]) -> Vec<rusqlite::types::Value> {
2655 params.iter().map(|p| match p {
2656 SqlParamValue::Text(s) => rusqlite::types::Value::Text(s.clone()),
2657 SqlParamValue::Integer(n) => rusqlite::types::Value::Integer(*n),
2658 SqlParamValue::Real(f) => rusqlite::types::Value::Real(*f),
2659 SqlParamValue::Bool(b) => rusqlite::types::Value::Integer(*b as i64),
2660 SqlParamValue::Null => rusqlite::types::Value::Null,
2661 }).collect()
2662}
2663
2664fn pg_param_refs(params: &[SqlParamValue]) -> Vec<Box<dyn postgres::types::ToSql + Sync>> {
2666 params.iter().map(|p| -> Box<dyn postgres::types::ToSql + Sync> {
2667 match p {
2668 SqlParamValue::Text(s) => Box::new(s.clone()),
2669 SqlParamValue::Integer(n) => Box::new(*n),
2670 SqlParamValue::Real(f) => Box::new(*f),
2671 SqlParamValue::Bool(b) => Box::new(*b),
2672 SqlParamValue::Null => Box::new(Option::<String>::None),
2673 }
2674 }).collect()
2675}
2676
2677fn sql_run_query_sqlite(
2679 conn: &rusqlite::Connection,
2680 stmt_str: &str,
2681 params: &[SqlParamValue],
2682) -> Value {
2683 let mut stmt = match conn.prepare(stmt_str) {
2684 Ok(s) => s,
2685 Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2686 };
2687 let column_count = stmt.column_count();
2688 let column_names: Vec<String> = (0..column_count)
2689 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
2690 .collect();
2691 let bound = sqlite_params(params);
2692 let bind: Vec<&dyn rusqlite::ToSql> = bound.iter()
2693 .map(|p| p as &dyn rusqlite::ToSql)
2694 .collect();
2695 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
2696 Ok(r) => r,
2697 Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2698 };
2699 let mut out: Vec<Value> = Vec::new();
2700 loop {
2701 let row = match rows.next() {
2702 Ok(Some(r)) => r,
2703 Ok(None) => break,
2704 Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2705 };
2706 let mut rec = indexmap::IndexMap::new();
2707 for (i, name) in column_names.iter().enumerate() {
2708 let cell = match row.get_ref(i) {
2709 Ok(c) => sql_value_ref_to_lex(c),
2710 Err(e) => return err(sqlite_err_to_sql_error(e, &format!("sql.query: column {i}"))),
2711 };
2712 rec.insert(name.clone(), cell);
2713 }
2714 out.push(Value::Record(rec));
2715 }
2716 ok(Value::List(out))
2717}
2718
2719fn sql_run_query_pg(
2721 client: &mut postgres::Client,
2722 stmt_str: &str,
2723 params: &[SqlParamValue],
2724) -> Value {
2725 let pg = pg_param_refs(params);
2726 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
2727 pg.iter().map(|b| b.as_ref()).collect();
2728 let rows = match client.query(stmt_str, &refs) {
2729 Ok(r) => r,
2730 Err(e) => return err(pg_err_to_sql_error(e, "sql.query")),
2731 };
2732 let out: Vec<Value> = rows.iter().map(|row| {
2733 Value::Record(pg_row_to_lex_record(row))
2734 }).collect();
2735 ok(Value::List(out))
2736}
2737
2738fn pg_row_to_lex_record(row: &postgres::Row) -> indexmap::IndexMap<String, Value> {
2740 use postgres::types::Type;
2741 let mut rec = indexmap::IndexMap::new();
2742 for (i, col) in row.columns().iter().enumerate() {
2743 let ty = col.type_();
2744 let val = if *ty == Type::INT2 || *ty == Type::INT4 || *ty == Type::INT8 {
2745 row.get::<_, Option<i64>>(i).map(Value::Int).unwrap_or(Value::Unit)
2746 } else if *ty == Type::FLOAT4 || *ty == Type::FLOAT8 {
2747 row.get::<_, Option<f64>>(i).map(Value::Float).unwrap_or(Value::Unit)
2748 } else if *ty == Type::BOOL {
2749 row.get::<_, Option<bool>>(i).map(Value::Bool).unwrap_or(Value::Unit)
2750 } else if *ty == Type::BYTEA {
2751 row.get::<_, Option<Vec<u8>>>(i).map(Value::Bytes).unwrap_or(Value::Unit)
2752 } else {
2753 row.get::<_, Option<String>>(i).map(Value::Str).unwrap_or(Value::Unit)
2754 };
2755 rec.insert(col.name().to_string(), val);
2756 }
2757 rec
2758}
2759
2760fn sql_get_col<F>(args: &[Value], convert: F) -> Result<Value, String>
2762where
2763 F: Fn(&Value) -> Option<Value>,
2764{
2765 let row = args.first().ok_or("sql.get_*: missing row argument")?;
2766 let col = match args.get(1) {
2767 Some(Value::Str(s)) => s.as_str(),
2768 Some(other) => return Err(format!("sql.get_*: column name must be Str, got {other:?}")),
2769 None => return Err("sql.get_*: missing column name argument".into()),
2770 };
2771 let cell = match row {
2772 Value::Record(rec) => rec.get(col).cloned(),
2773 other => return Err(format!("sql.get_*: row must be a Record, got {other:?}")),
2774 };
2775 Ok(match cell.and_then(|v| convert(&v)) {
2776 Some(v) => Value::Variant { name: "Some".into(), args: vec![v] },
2777 None => Value::Variant { name: "None".into(), args: vec![] },
2778 })
2779}
2780
2781fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
2782 use rusqlite::types::ValueRef;
2783 match v {
2784 ValueRef::Null => Value::Unit,
2785 ValueRef::Integer(n) => Value::Int(n),
2786 ValueRef::Real(f) => Value::Float(f),
2787 ValueRef::Text(s) => Value::Str(String::from_utf8_lossy(s).into_owned()),
2788 ValueRef::Blob(b) => Value::Bytes(b.to_vec()),
2789 }
2790}
2791
2792#[derive(Clone, Copy, PartialEq, PartialOrd)]
2795enum LogLevel { Debug, Info, Warn, Error }
2796
2797#[derive(Clone, Copy, PartialEq)]
2798enum LogFormat { Text, Json }
2799
2800#[derive(Clone)]
2801enum LogSink {
2802 Stderr,
2803 File(std::sync::Arc<Mutex<std::fs::File>>),
2804}
2805
2806struct LogState {
2807 level: LogLevel,
2808 format: LogFormat,
2809 sink: LogSink,
2810}
2811
2812fn log_state() -> &'static Mutex<LogState> {
2813 static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
2814 STATE.get_or_init(|| Mutex::new(LogState {
2815 level: LogLevel::Info,
2816 format: LogFormat::Text,
2817 sink: LogSink::Stderr,
2818 }))
2819}
2820
2821fn parse_log_level(s: &str) -> Option<LogLevel> {
2822 match s {
2823 "debug" => Some(LogLevel::Debug),
2824 "info" => Some(LogLevel::Info),
2825 "warn" => Some(LogLevel::Warn),
2826 "error" => Some(LogLevel::Error),
2827 _ => None,
2828 }
2829}
2830
2831fn level_label(l: LogLevel) -> &'static str {
2832 match l {
2833 LogLevel::Debug => "debug",
2834 LogLevel::Info => "info",
2835 LogLevel::Warn => "warn",
2836 LogLevel::Error => "error",
2837 }
2838}
2839
2840fn emit_log(level: LogLevel, msg: &str) {
2841 let state = log_state().lock().unwrap();
2842 if level < state.level {
2843 return;
2844 }
2845 let ts = chrono::Utc::now().to_rfc3339();
2846 let line = match state.format {
2847 LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
2848 LogFormat::Json => {
2849 let escaped = msg
2853 .replace('\\', "\\\\")
2854 .replace('"', "\\\"")
2855 .replace('\n', "\\n")
2856 .replace('\r', "\\r");
2857 format!(
2858 "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
2859 level_label(level),
2860 )
2861 }
2862 };
2863 let sink = state.sink.clone();
2864 drop(state);
2865 match sink {
2866 LogSink::Stderr => {
2867 use std::io::Write;
2868 let _ = std::io::stderr().write_all(line.as_bytes());
2869 }
2870 LogSink::File(f) => {
2871 use std::io::Write;
2872 if let Ok(mut g) = f.lock() {
2873 let _ = g.write_all(line.as_bytes());
2874 }
2875 }
2876 }
2877}
2878
2879pub(crate) struct ProcessState {
2880 child: std::process::Child,
2881 stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
2882 stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
2883}
2884
2885fn process_registry() -> &'static Mutex<ProcessRegistry> {
2899 static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
2900 REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
2901}
2902
2903const MAX_PROCESS_HANDLES: usize = 256;
2904
2905type SharedProcessState = Arc<Mutex<ProcessState>>;
2906
2907pub(crate) struct ProcessRegistry {
2908 entries: indexmap::IndexMap<u64, SharedProcessState>,
2909 cap: usize,
2910}
2911
2912impl ProcessRegistry {
2913 pub(crate) fn with_capacity(cap: usize) -> Self {
2914 Self { entries: indexmap::IndexMap::new(), cap }
2915 }
2916
2917 pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
2921 if self.entries.len() >= self.cap {
2922 self.entries.shift_remove_index(0);
2923 }
2924 self.entries.insert(handle, Arc::new(Mutex::new(state)));
2925 }
2926
2927 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
2931 let idx = self.entries.get_index_of(&handle)?;
2932 self.entries.move_index(idx, self.entries.len() - 1);
2933 self.entries.get(&handle).cloned()
2934 }
2935
2936 pub(crate) fn remove(&mut self, handle: u64) {
2941 self.entries.shift_remove(&handle);
2942 }
2943
2944 #[cfg(test)]
2945 pub(crate) fn len(&self) -> usize { self.entries.len() }
2946}
2947
2948fn next_process_handle() -> u64 {
2949 static COUNTER: AtomicU64 = AtomicU64::new(1);
2950 COUNTER.fetch_add(1, Ordering::SeqCst)
2951}
2952
2953#[cfg(all(test, unix))]
2954mod process_registry_tests {
2955 use super::{ProcessRegistry, ProcessState};
2956
2957 fn fresh_state() -> ProcessState {
2961 let child = std::process::Command::new("true")
2962 .stdout(std::process::Stdio::null())
2963 .stderr(std::process::Stdio::null())
2964 .spawn()
2965 .expect("spawn `true`");
2966 ProcessState { child, stdout: None, stderr: None }
2967 }
2968
2969 #[test]
2970 fn insert_and_get_round_trip() {
2971 let mut r = ProcessRegistry::with_capacity(4);
2972 r.insert(1, fresh_state());
2973 assert!(r.touch_get(1).is_some());
2974 assert!(r.touch_get(2).is_none());
2975 }
2976
2977 #[test]
2978 fn touch_get_returns_distinct_arcs_for_distinct_handles() {
2979 let mut r = ProcessRegistry::with_capacity(4);
2980 r.insert(1, fresh_state());
2981 r.insert(2, fresh_state());
2982 let a = r.touch_get(1).unwrap();
2983 let b = r.touch_get(2).unwrap();
2984 assert!(!std::sync::Arc::ptr_eq(&a, &b));
2986 }
2987
2988 #[test]
2989 fn cap_evicts_lru_on_overflow() {
2990 let mut r = ProcessRegistry::with_capacity(2);
2991 r.insert(1, fresh_state());
2992 r.insert(2, fresh_state());
2993 let _ = r.touch_get(1);
2994 r.insert(3, fresh_state());
2995 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2996 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2997 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2998 assert_eq!(r.len(), 2);
2999 }
3000
3001 #[test]
3002 fn cap_with_no_touches_evicts_in_insertion_order() {
3003 let mut r = ProcessRegistry::with_capacity(2);
3004 r.insert(10, fresh_state());
3005 r.insert(20, fresh_state());
3006 r.insert(30, fresh_state());
3007 assert!(r.touch_get(10).is_none());
3008 assert!(r.touch_get(20).is_some());
3009 assert!(r.touch_get(30).is_some());
3010 }
3011
3012 #[test]
3013 fn remove_drops_entry() {
3014 let mut r = ProcessRegistry::with_capacity(4);
3015 r.insert(1, fresh_state());
3016 r.remove(1);
3017 assert!(r.touch_get(1).is_none());
3018 assert_eq!(r.len(), 0);
3019 }
3020
3021 #[test]
3022 fn many_inserts_stay_bounded_at_cap() {
3023 let cap = 8;
3024 let mut r = ProcessRegistry::with_capacity(cap);
3025 for i in 0..(cap as u64 * 3) {
3026 r.insert(i, fresh_state());
3027 assert!(r.len() <= cap);
3028 }
3029 assert_eq!(r.len(), cap);
3030 }
3031
3032 #[test]
3033 fn outstanding_arc_outlives_remove() {
3034 let mut r = ProcessRegistry::with_capacity(4);
3038 r.insert(1, fresh_state());
3039 let arc = r.touch_get(1).expect("entry exists");
3040 r.remove(1);
3041 assert!(r.touch_get(1).is_none());
3043 let _state = arc.lock().unwrap();
3044 }
3045}
3046
3047fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
3048 match v {
3049 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
3050 Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
3051 None => Err("missing ProcessHandle argument".into()),
3052 }
3053}
3054
3055fn kv_registry() -> &'static Mutex<KvRegistry> {
3067 static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
3068 REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
3069}
3070
3071const MAX_KV_HANDLES: usize = 256;
3077
3078pub(crate) struct KvRegistry {
3083 entries: indexmap::IndexMap<u64, sled::Db>,
3084 cap: usize,
3085}
3086
3087impl KvRegistry {
3088 pub(crate) fn with_capacity(cap: usize) -> Self {
3089 Self { entries: indexmap::IndexMap::new(), cap }
3090 }
3091
3092 pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
3095 if self.entries.len() >= self.cap {
3096 self.entries.shift_remove_index(0);
3097 }
3098 self.entries.insert(handle, db);
3099 }
3100
3101 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
3103 let idx = self.entries.get_index_of(&handle)?;
3104 self.entries.move_index(idx, self.entries.len() - 1);
3105 self.entries.get(&handle)
3106 }
3107
3108 pub(crate) fn remove(&mut self, handle: u64) {
3110 self.entries.shift_remove(&handle);
3111 }
3112
3113 #[cfg(test)]
3114 pub(crate) fn len(&self) -> usize { self.entries.len() }
3115}
3116
3117fn next_kv_handle() -> u64 {
3118 static COUNTER: AtomicU64 = AtomicU64::new(1);
3119 COUNTER.fetch_add(1, Ordering::SeqCst)
3120}
3121
3122fn sql_registry() -> &'static Mutex<SqlRegistry> {
3129 static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
3130 REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
3131}
3132
3133const MAX_SQL_HANDLES: usize = 256;
3134
3135const CURSOR_CHANNEL_CAPACITY: usize = 64;
3158const MAX_CURSOR_HANDLES: usize = 256;
3159
3160type CursorReceiver = std::sync::mpsc::Receiver<Result<Value, String>>;
3161
3162pub(crate) struct CursorRegistry {
3163 entries: indexmap::IndexMap<u64, Arc<Mutex<CursorReceiver>>>,
3168 cap: usize,
3169}
3170
3171impl CursorRegistry {
3172 pub(crate) fn with_capacity(cap: usize) -> Self {
3173 Self { entries: indexmap::IndexMap::new(), cap }
3174 }
3175
3176 pub(crate) fn insert(&mut self, handle: u64, rx: CursorReceiver) {
3177 if self.entries.len() >= self.cap {
3178 self.entries.shift_remove_index(0);
3179 }
3180 self.entries.insert(handle, Arc::new(Mutex::new(rx)));
3181 }
3182
3183 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<Arc<Mutex<CursorReceiver>>> {
3184 let idx = self.entries.get_index_of(&handle)?;
3185 self.entries.move_index(idx, self.entries.len() - 1);
3186 self.entries.get(&handle).cloned()
3187 }
3188
3189 pub(crate) fn remove(&mut self, handle: u64) {
3190 self.entries.shift_remove(&handle);
3191 }
3192}
3193
3194fn cursor_registry() -> &'static Mutex<CursorRegistry> {
3195 static REGISTRY: OnceLock<Mutex<CursorRegistry>> = OnceLock::new();
3196 REGISTRY.get_or_init(|| Mutex::new(CursorRegistry::with_capacity(MAX_CURSOR_HANDLES)))
3197}
3198
3199fn next_cursor_handle() -> u64 {
3200 static COUNTER: AtomicU64 = AtomicU64::new(1);
3201 COUNTER.fetch_add(1, Ordering::SeqCst)
3202}
3203
3204fn sqlite_cursor_producer(
3210 conn_arc: Arc<Mutex<SqlConn>>,
3211 stmt_str: String,
3212 params: Vec<SqlParamValue>,
3213 sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
3214) {
3215 let mut conn_guard = match conn_arc.lock() {
3216 Ok(g) => g,
3217 Err(p) => p.into_inner(),
3218 };
3219 let SqlConn::Sqlite(c) = &mut *conn_guard else {
3220 let _ = sender.send(Err("sqlite_cursor_producer called on non-sqlite conn".into()));
3221 return;
3222 };
3223 let mut stmt = match c.prepare(&stmt_str) {
3224 Ok(s) => s,
3225 Err(e) => { let _ = sender.send(Err(format!("prepare: {e}"))); return; }
3226 };
3227 let column_count = stmt.column_count();
3228 let column_names: Vec<String> = (0..column_count)
3229 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
3230 .collect();
3231 let bound = sqlite_params(¶ms);
3232 let bind: Vec<&dyn rusqlite::ToSql> =
3233 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
3234 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
3235 Ok(r) => r,
3236 Err(e) => { let _ = sender.send(Err(format!("query: {e}"))); return; }
3237 };
3238 loop {
3239 match rows.next() {
3240 Ok(None) => break,
3241 Err(e) => {
3242 let _ = sender.send(Err(format!("row: {e}")));
3243 break;
3244 }
3245 Ok(Some(row)) => {
3246 let mut rec = indexmap::IndexMap::new();
3247 for (i, name) in column_names.iter().enumerate() {
3248 let val = match row.get_ref(i) {
3249 Ok(vr) => sql_value_ref_to_lex(vr),
3250 Err(_) => Value::Unit,
3251 };
3252 rec.insert(name.clone(), val);
3253 }
3254 if sender.send(Ok(Value::Record(rec))).is_err() {
3255 break;
3256 }
3257 }
3258 }
3259 }
3260}
3261
3262fn pg_cursor_producer(
3266 conn_arc: Arc<Mutex<SqlConn>>,
3267 stmt_str: String,
3268 params: Vec<SqlParamValue>,
3269 sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
3270) {
3271 let mut conn_guard = match conn_arc.lock() {
3272 Ok(g) => g,
3273 Err(p) => p.into_inner(),
3274 };
3275 let SqlConn::Postgres(c) = &mut *conn_guard else {
3276 let _ = sender.send(Err("pg_cursor_producer called on non-postgres conn".into()));
3277 return;
3278 };
3279 let pg = pg_param_refs(¶ms);
3280 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
3281 pg.iter().map(|b| b.as_ref()).collect();
3282 let mut tx = match c.transaction() {
3283 Ok(t) => t,
3284 Err(e) => { let _ = sender.send(Err(format!("begin: {e}"))); return; }
3285 };
3286 let cur_name = format!("__lex_cur_{}", next_cursor_handle());
3289 if let Err(e) = tx.execute(
3290 &format!("DECLARE \"{cur_name}\" NO SCROLL CURSOR FOR {stmt_str}"),
3291 &refs,
3292 ) {
3293 let _ = sender.send(Err(format!("declare: {e}")));
3294 return;
3295 }
3296 let fetch_sql = format!("FETCH 64 FROM \"{cur_name}\"");
3297 'outer: loop {
3298 let batch = match tx.query(&fetch_sql, &[]) {
3299 Ok(r) => r,
3300 Err(e) => { let _ = sender.send(Err(format!("fetch: {e}"))); break; }
3301 };
3302 if batch.is_empty() {
3303 break;
3304 }
3305 for row in batch.iter() {
3306 let rec = pg_row_to_lex_record(row);
3307 if sender.send(Ok(Value::Record(rec))).is_err() {
3308 break 'outer;
3309 }
3310 }
3311 }
3312 let _ = tx.execute(&format!("CLOSE \"{cur_name}\""), &[]);
3313 let _ = tx.commit();
3314}
3315
3316#[derive(Debug, Clone)]
3318enum SqlParamValue {
3319 Text(String),
3320 Integer(i64),
3321 Real(f64),
3322 Bool(bool),
3323 Null,
3324}
3325
3326pub(crate) enum SqlConn {
3328 Sqlite(rusqlite::Connection),
3329 Postgres(postgres::Client),
3330}
3331
3332type SharedConn = Arc<Mutex<SqlConn>>;
3333
3334pub(crate) struct SqlRegistry {
3335 entries: indexmap::IndexMap<u64, SharedConn>,
3336 cap: usize,
3337}
3338
3339impl SqlRegistry {
3340 pub(crate) fn with_capacity(cap: usize) -> Self {
3341 Self { entries: indexmap::IndexMap::new(), cap }
3342 }
3343
3344 pub(crate) fn insert(&mut self, handle: u64, conn: SqlConn) {
3345 if self.entries.len() >= self.cap {
3346 self.entries.shift_remove_index(0);
3347 }
3348 self.entries.insert(handle, Arc::new(Mutex::new(conn)));
3349 }
3350
3351 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
3355 let idx = self.entries.get_index_of(&handle)?;
3356 self.entries.move_index(idx, self.entries.len() - 1);
3357 self.entries.get(&handle).cloned()
3358 }
3359
3360 pub(crate) fn remove(&mut self, handle: u64) {
3361 self.entries.shift_remove(&handle);
3362 }
3363
3364 #[cfg(test)]
3365 pub(crate) fn len(&self) -> usize { self.entries.len() }
3366}
3367
3368fn next_sql_handle() -> u64 {
3369 static COUNTER: AtomicU64 = AtomicU64::new(1);
3370 COUNTER.fetch_add(1, Ordering::SeqCst)
3371}
3372
3373#[cfg(test)]
3374mod sql_registry_tests {
3375 use super::{SqlConn, SqlRegistry};
3376
3377 fn fresh() -> SqlConn {
3378 SqlConn::Sqlite(rusqlite::Connection::open_in_memory().expect("open in-memory sqlite"))
3379 }
3380
3381 #[test]
3382 fn insert_and_get_round_trip() {
3383 let mut r = SqlRegistry::with_capacity(4);
3384 r.insert(1, fresh());
3385 assert!(r.touch_get(1).is_some());
3386 assert!(r.touch_get(2).is_none());
3387 }
3388
3389 #[test]
3390 fn cap_evicts_lru_on_overflow() {
3391 let mut r = SqlRegistry::with_capacity(2);
3392 r.insert(1, fresh());
3393 r.insert(2, fresh());
3394 let _ = r.touch_get(1);
3395 r.insert(3, fresh());
3396 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3397 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3398 assert!(r.touch_get(3).is_some(), "3 just inserted");
3399 assert_eq!(r.len(), 2);
3400 }
3401
3402 #[test]
3403 fn remove_drops_entry() {
3404 let mut r = SqlRegistry::with_capacity(4);
3405 r.insert(1, fresh());
3406 r.remove(1);
3407 assert!(r.touch_get(1).is_none());
3408 assert_eq!(r.len(), 0);
3409 }
3410
3411 #[test]
3412 fn many_inserts_stay_bounded_at_cap() {
3413 let cap = 8;
3414 let mut r = SqlRegistry::with_capacity(cap);
3415 for i in 0..(cap as u64 * 3) {
3416 r.insert(i, fresh());
3417 assert!(r.len() <= cap);
3418 }
3419 assert_eq!(r.len(), cap);
3420 }
3421}
3422
3423#[cfg(test)]
3424mod kv_registry_tests {
3425 use super::KvRegistry;
3426
3427 fn fresh_db(tag: &str) -> sled::Db {
3430 let dir = std::env::temp_dir().join(format!(
3431 "lex-kv-reg-{}-{}-{}",
3432 std::process::id(),
3433 tag,
3434 std::time::SystemTime::now()
3435 .duration_since(std::time::UNIX_EPOCH)
3436 .unwrap()
3437 .as_nanos()
3438 ));
3439 sled::open(&dir).expect("sled open")
3440 }
3441
3442 #[test]
3443 fn insert_and_get_round_trip() {
3444 let mut r = KvRegistry::with_capacity(4);
3445 r.insert(1, fresh_db("a"));
3446 assert!(r.touch_get(1).is_some());
3447 assert!(r.touch_get(2).is_none());
3448 }
3449
3450 #[test]
3451 fn cap_evicts_lru_on_overflow() {
3452 let mut r = KvRegistry::with_capacity(2);
3454 r.insert(1, fresh_db("c1"));
3455 r.insert(2, fresh_db("c2"));
3456 let _ = r.touch_get(1);
3457 r.insert(3, fresh_db("c3"));
3458 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3459 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3460 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
3461 assert_eq!(r.len(), 2);
3462 }
3463
3464 #[test]
3465 fn cap_with_no_touches_evicts_in_insertion_order() {
3466 let mut r = KvRegistry::with_capacity(2);
3468 r.insert(10, fresh_db("f1"));
3469 r.insert(20, fresh_db("f2"));
3470 r.insert(30, fresh_db("f3"));
3471 assert!(r.touch_get(10).is_none());
3472 assert!(r.touch_get(20).is_some());
3473 assert!(r.touch_get(30).is_some());
3474 }
3475
3476 #[test]
3477 fn remove_drops_entry() {
3478 let mut r = KvRegistry::with_capacity(4);
3479 r.insert(1, fresh_db("r1"));
3480 r.remove(1);
3481 assert!(r.touch_get(1).is_none());
3482 assert_eq!(r.len(), 0);
3483 }
3484
3485 #[test]
3486 fn remove_unknown_handle_is_noop() {
3487 let mut r = KvRegistry::with_capacity(4);
3488 r.insert(1, fresh_db("u1"));
3489 r.remove(999);
3490 assert!(r.touch_get(1).is_some());
3491 }
3492
3493 #[test]
3494 fn many_inserts_stay_bounded_at_cap() {
3495 let cap = 8;
3498 let mut r = KvRegistry::with_capacity(cap);
3499 for i in 0..(cap as u64 * 3) {
3500 r.insert(i, fresh_db(&format!("b{i}")));
3501 assert!(r.len() <= cap);
3502 }
3503 assert_eq!(r.len(), cap);
3504 }
3505}