1use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18pub trait IoSink: Send {
21 fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26 fn print_line(&mut self, s: &str) {
27 println!("{s}");
28 }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34 fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37pub struct DefaultHandler {
38 policy: Policy,
39 pub sink: Box<dyn IoSink>,
40 pub read_root: Option<PathBuf>,
43 pub budget_remaining: Arc<AtomicU64>,
50 pub budget_ceiling: Option<u64>,
54 pub program: Option<Arc<Program>>,
58 pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
62 pub mcp_clients: crate::mcp_client::McpClientCache,
68}
69
70impl DefaultHandler {
71 pub fn new(policy: Policy) -> Self {
72 let ceiling = policy.budget;
76 let initial = ceiling.unwrap_or(u64::MAX);
77 Self {
78 policy,
79 sink: Box::new(StdoutSink),
80 read_root: None,
81 budget_remaining: Arc::new(AtomicU64::new(initial)),
82 budget_ceiling: ceiling,
83 program: None,
84 chat_registry: None,
85 mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
86 }
87 }
88
89 pub fn with_program(mut self, program: Arc<Program>) -> Self {
90 self.program = Some(program); self
91 }
92
93 pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
94 self.chat_registry = Some(registry); self
95 }
96
97 pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
98 self.sink = sink; self
99 }
100
101 pub fn with_read_root(mut self, root: PathBuf) -> Self {
102 self.read_root = Some(root); self
103 }
104
105 fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
106 if self.policy.allow_effects.contains(kind) {
107 Ok(())
108 } else {
109 Err(format!("effect `{kind}` not in --allow-effects"))
110 }
111 }
112
113 fn resolve_read_path(&self, p: &str) -> PathBuf {
114 match &self.read_root {
115 Some(root) => root.join(p.trim_start_matches('/')),
116 None => PathBuf::from(p),
117 }
118 }
119
120 fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
121 match op {
122 "debug" | "info" | "warn" | "error" => {
123 let msg = expect_str(args.first())?;
124 let level = match op {
125 "debug" => LogLevel::Debug,
126 "info" => LogLevel::Info,
127 "warn" => LogLevel::Warn,
128 _ => LogLevel::Error,
129 };
130 emit_log(level, msg);
131 Ok(Value::Unit)
132 }
133 "set_level" => {
134 let s = expect_str(args.first())?;
135 match parse_log_level(s) {
136 Some(l) => {
137 log_state().lock().unwrap().level = l;
138 Ok(ok(Value::Unit))
139 }
140 None => Ok(err(Value::Str(format!(
141 "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
142 }
143 }
144 "set_format" => {
145 let s = expect_str(args.first())?;
146 let fmt = match s {
147 "text" => LogFormat::Text,
148 "json" => LogFormat::Json,
149 other => return Ok(err(Value::Str(format!(
150 "log.set_format: unknown format `{other}`; expected text|json")))),
151 };
152 log_state().lock().unwrap().format = fmt;
153 Ok(ok(Value::Unit))
154 }
155 "set_sink" => {
156 let path = expect_str(args.first())?;
157 if path == "-" {
158 log_state().lock().unwrap().sink = LogSink::Stderr;
159 return Ok(ok(Value::Unit));
160 }
161 if let Err(e) = self.ensure_fs_write_path(path) {
162 return Ok(err(Value::Str(e)));
163 }
164 match std::fs::OpenOptions::new()
165 .create(true).append(true).open(path)
166 {
167 Ok(f) => {
168 log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
169 Ok(ok(Value::Unit))
170 }
171 Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
172 }
173 }
174 other => Err(format!("unsupported log.{other}")),
175 }
176 }
177
178 fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
179 match op {
180 "spawn" => {
181 let cmd = expect_str(args.first())?.to_string();
182 let raw_args = match args.get(1) {
183 Some(Value::List(items)) => items.clone(),
184 _ => return Err("process.spawn: args must be List[Str]".into()),
185 };
186 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
187 Value::Str(s) => Ok(s.clone()),
188 other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
189 }).collect();
190 let str_args = str_args?;
191 let opts = match args.get(2) {
192 Some(Value::Record(r)) => r.clone(),
193 _ => return Err("process.spawn: missing or invalid opts record".into()),
194 };
195
196 if !self.policy.allow_proc.is_empty() {
198 let basename = std::path::Path::new(&cmd)
199 .file_name()
200 .and_then(|s| s.to_str())
201 .unwrap_or(&cmd);
202 if !self.policy.allow_proc.iter().any(|a| a == basename) {
203 return Ok(err(Value::Str(format!(
204 "process.spawn: `{cmd}` not in --allow-proc {:?}",
205 self.policy.allow_proc
206 ))));
207 }
208 }
209
210 let mut command = std::process::Command::new(&cmd);
211 command.args(&str_args);
212 command.stdin(std::process::Stdio::piped());
213 command.stdout(std::process::Stdio::piped());
214 command.stderr(std::process::Stdio::piped());
215
216 if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
217 if name == "Some" {
218 if let Some(Value::Str(s)) = vargs.first() {
219 command.current_dir(s);
220 }
221 }
222 }
223 if let Some(Value::Map(env)) = opts.get("env") {
224 for (k, v) in env {
225 if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
226 command.env(ks, vs);
227 }
228 }
229 }
230
231 let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
232 Some(Value::Variant { name, args: vargs }) if name == "Some" => {
233 match vargs.first() {
234 Some(Value::Bytes(b)) => Some(b.clone()),
235 _ => None,
236 }
237 }
238 _ => None,
239 };
240
241 let mut child = match command.spawn() {
242 Ok(c) => c,
243 Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
244 };
245
246 if let Some(payload) = stdin_payload {
247 if let Some(mut stdin) = child.stdin.take() {
248 use std::io::Write;
249 let _ = stdin.write_all(&payload);
250 }
252 }
253
254 let stdout = child.stdout.take().map(std::io::BufReader::new);
255 let stderr = child.stderr.take().map(std::io::BufReader::new);
256 let handle = next_process_handle();
257 process_registry().lock().unwrap().insert(handle, ProcessState {
258 child,
259 stdout,
260 stderr,
261 });
262 Ok(ok(Value::Int(handle as i64)))
263 }
264 "read_stdout_line" => Self::read_line_op(args, true),
265 "read_stderr_line" => Self::read_line_op(args, false),
266 "wait" => {
267 let h = expect_process_handle(args.first())?;
268 let arc = process_registry().lock().unwrap()
272 .touch_get(h)
273 .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
274 let status = {
275 let mut state = arc.lock().unwrap();
276 state.child.wait().map_err(|e| format!("process.wait: {e}"))?
277 };
278 process_registry().lock().unwrap().remove(h);
282 let mut rec = indexmap::IndexMap::new();
283 rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
284 #[cfg(unix)]
285 {
286 use std::os::unix::process::ExitStatusExt;
287 rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
288 }
289 #[cfg(not(unix))]
290 {
291 rec.insert("signaled".into(), Value::Bool(false));
292 }
293 Ok(Value::Record(rec))
294 }
295 "kill" => {
296 let h = expect_process_handle(args.first())?;
297 let _signal = expect_str(args.get(1))?;
298 let arc = process_registry().lock().unwrap()
299 .touch_get(h)
300 .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
301 let mut state = arc.lock().unwrap();
302 match state.child.kill() {
305 Ok(_) => Ok(ok(Value::Unit)),
306 Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
307 }
308 }
309 "run" => {
310 let cmd = expect_str(args.first())?.to_string();
311 let raw_args = match args.get(1) {
312 Some(Value::List(items)) => items.clone(),
313 _ => return Err("process.run: args must be List[Str]".into()),
314 };
315 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
316 Value::Str(s) => Ok(s.clone()),
317 other => Err(format!("process.run: arg must be Str, got {other:?}")),
318 }).collect();
319 let str_args = str_args?;
320 if !self.policy.allow_proc.is_empty() {
321 let basename = std::path::Path::new(&cmd)
322 .file_name()
323 .and_then(|s| s.to_str())
324 .unwrap_or(&cmd);
325 if !self.policy.allow_proc.iter().any(|a| a == basename) {
326 return Ok(err(Value::Str(format!(
327 "process.run: `{cmd}` not in --allow-proc {:?}",
328 self.policy.allow_proc
329 ))));
330 }
331 }
332 match std::process::Command::new(&cmd).args(&str_args).output() {
333 Ok(o) => {
334 let mut rec = indexmap::IndexMap::new();
335 rec.insert("stdout".into(), Value::Str(
336 String::from_utf8_lossy(&o.stdout).to_string()));
337 rec.insert("stderr".into(), Value::Str(
338 String::from_utf8_lossy(&o.stderr).to_string()));
339 rec.insert("exit_code".into(), Value::Int(
340 o.status.code().unwrap_or(-1) as i64));
341 Ok(ok(Value::Record(rec)))
342 }
343 Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
344 }
345 }
346 other => Err(format!("unsupported process.{other}")),
347 }
348 }
349
350 fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
356 let h = expect_process_handle(args.first())?;
357 let arc = process_registry().lock().unwrap()
358 .touch_get(h)
359 .ok_or_else(|| format!(
360 "process.read_{}_line: closed or unknown ProcessHandle",
361 if is_stdout { "stdout" } else { "stderr" }))?;
362 let mut state = arc.lock().unwrap();
363 let reader_opt = if is_stdout {
364 state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
365 } else {
366 state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
367 };
368 let reader = match reader_opt {
369 Some(r) => r,
370 None => return Ok(none()),
371 };
372 let mut line = String::new();
373 match reader.read_line(&mut line) {
374 Ok(0) => Ok(none()),
375 Ok(_) => {
376 if line.ends_with('\n') { line.pop(); }
377 if line.ends_with('\r') { line.pop(); }
378 Ok(some(Value::Str(line)))
379 }
380 Err(e) => Err(format!("process.read_*_line: {e}")),
381 }
382 }
383
384 fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
385 match op {
386 "exists" => {
387 let path = expect_str(args.first())?.to_string();
388 if let Err(e) = self.ensure_fs_walk_path(&path) {
389 return Ok(err(Value::Str(e)));
390 }
391 Ok(Value::Bool(std::path::Path::new(&path).exists()))
392 }
393 "is_file" => {
394 let path = expect_str(args.first())?.to_string();
395 if let Err(e) = self.ensure_fs_walk_path(&path) {
396 return Ok(err(Value::Str(e)));
397 }
398 Ok(Value::Bool(std::path::Path::new(&path).is_file()))
399 }
400 "is_dir" => {
401 let path = expect_str(args.first())?.to_string();
402 if let Err(e) = self.ensure_fs_walk_path(&path) {
403 return Ok(err(Value::Str(e)));
404 }
405 Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
406 }
407 "stat" => {
408 let path = expect_str(args.first())?.to_string();
409 if let Err(e) = self.ensure_fs_walk_path(&path) {
410 return Ok(err(Value::Str(e)));
411 }
412 match std::fs::metadata(&path) {
413 Ok(md) => {
414 let mtime = md.modified()
415 .ok()
416 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
417 .map(|d| d.as_secs() as i64)
418 .unwrap_or(0);
419 let mut rec = indexmap::IndexMap::new();
420 rec.insert("size".into(), Value::Int(md.len() as i64));
421 rec.insert("mtime".into(), Value::Int(mtime));
422 rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
423 rec.insert("is_file".into(), Value::Bool(md.is_file()));
424 Ok(ok(Value::Record(rec)))
425 }
426 Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
427 }
428 }
429 "list_dir" => {
430 let path = expect_str(args.first())?.to_string();
431 if let Err(e) = self.ensure_fs_walk_path(&path) {
432 return Ok(err(Value::Str(e)));
433 }
434 match std::fs::read_dir(&path) {
435 Ok(rd) => {
436 let mut entries: Vec<Value> = Vec::new();
437 for ent in rd {
438 match ent {
439 Ok(e) => {
440 let p = e.path();
441 entries.push(Value::Str(p.to_string_lossy().into_owned()));
442 }
443 Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
444 }
445 }
446 Ok(ok(Value::List(entries)))
447 }
448 Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
449 }
450 }
451 "walk" => {
452 let path = expect_str(args.first())?.to_string();
453 if let Err(e) = self.ensure_fs_walk_path(&path) {
454 return Ok(err(Value::Str(e)));
455 }
456 let mut paths: Vec<Value> = Vec::new();
457 for ent in walkdir::WalkDir::new(&path) {
458 match ent {
459 Ok(e) => paths.push(Value::Str(
460 e.path().to_string_lossy().into_owned())),
461 Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
462 }
463 }
464 Ok(ok(Value::List(paths)))
465 }
466 "glob" => {
467 let pattern = expect_str(args.first())?.to_string();
468 let entries = match glob::glob(&pattern) {
473 Ok(e) => e,
474 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
475 };
476 let mut paths: Vec<Value> = Vec::new();
477 for ent in entries {
478 match ent {
479 Ok(p) => {
480 let s = p.to_string_lossy().into_owned();
481 if self.policy.allow_fs_read.is_empty()
482 || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
483 {
484 paths.push(Value::Str(s));
485 }
486 }
487 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
488 }
489 }
490 Ok(ok(Value::List(paths)))
491 }
492 "mkdir_p" => {
493 let path = expect_str(args.first())?.to_string();
494 if let Err(e) = self.ensure_fs_write_path(&path) {
495 return Ok(err(Value::Str(e)));
496 }
497 match std::fs::create_dir_all(&path) {
498 Ok(_) => Ok(ok(Value::Unit)),
499 Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
500 }
501 }
502 "remove" => {
503 let path = expect_str(args.first())?.to_string();
504 if let Err(e) = self.ensure_fs_write_path(&path) {
505 return Ok(err(Value::Str(e)));
506 }
507 let p = std::path::Path::new(&path);
508 let result = if p.is_dir() {
509 std::fs::remove_dir_all(p)
510 } else {
511 std::fs::remove_file(p)
512 };
513 match result {
514 Ok(_) => Ok(ok(Value::Unit)),
515 Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
516 }
517 }
518 "copy" => {
519 let src = expect_str(args.first())?.to_string();
520 let dst = expect_str(args.get(1))?.to_string();
521 if let Err(e) = self.ensure_fs_walk_path(&src) {
522 return Ok(err(Value::Str(e)));
523 }
524 if let Err(e) = self.ensure_fs_write_path(&dst) {
525 return Ok(err(Value::Str(e)));
526 }
527 match std::fs::copy(&src, &dst) {
528 Ok(_) => Ok(ok(Value::Unit)),
529 Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
530 }
531 }
532 other => Err(format!("unsupported fs.{other}")),
533 }
534 }
535
536 fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
541 if self.policy.allow_fs_read.is_empty() {
542 return Ok(());
543 }
544 let p = std::path::Path::new(path);
545 if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
546 Ok(())
547 } else {
548 Err(format!("fs path `{path}` outside --allow-fs-read"))
549 }
550 }
551
552 fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
555 if self.policy.allow_fs_write.is_empty() {
556 return Ok(());
557 }
558 let p = std::path::Path::new(path);
559 if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
560 Ok(())
561 } else {
562 Err(format!("fs path `{path}` outside --allow-fs-write"))
563 }
564 }
565
566 fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
570 if self.policy.allow_net_host.is_empty() { return Ok(()); }
571 let host = extract_host(url).unwrap_or("");
572 if self.policy.allow_net_host.iter().any(|h| host == h) {
573 Ok(())
574 } else {
575 Err(format!(
576 "net call to host `{host}` not in --allow-net-host {:?}",
577 self.policy.allow_net_host,
578 ))
579 }
580 }
581}
582
583fn extract_host(url: &str) -> Option<&str> {
584 let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
585 let host_port = match rest.find('/') {
586 Some(i) => &rest[..i],
587 None => rest,
588 };
589 Some(match host_port.rsplit_once(':') {
590 Some((h, _)) => h,
591 None => host_port,
592 })
593}
594
595impl EffectHandler for DefaultHandler {
596 fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
601 let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
604 loop {
610 let cur = self.budget_remaining.load(Ordering::SeqCst);
611 if cost > cur {
612 let used = ceiling.saturating_sub(cur);
613 return Err(format!(
614 "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
615 }
616 let next = cur - cost;
617 if self.budget_remaining.compare_exchange(cur, next,
620 Ordering::SeqCst, Ordering::SeqCst).is_ok() {
621 return Ok(());
622 }
623 }
624 }
625
626 fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
627 if let Some(r) = try_pure_builtin(kind, op, &args) {
631 return r;
632 }
633 if kind == "process" {
637 self.ensure_kind_allowed("proc")?;
638 return self.dispatch_process(op, args);
639 }
640 if kind == "log" {
641 let effect_kind = match op {
644 "debug" | "info" | "warn" | "error" => "log",
645 "set_level" | "set_format" => "io",
646 "set_sink" => {
647 self.ensure_kind_allowed("io")?;
648 self.ensure_kind_allowed("fs_write")?;
649 return self.dispatch_log(op, args);
650 }
651 other => return Err(format!("unsupported log.{other}")),
652 };
653 self.ensure_kind_allowed(effect_kind)?;
654 return self.dispatch_log(op, args);
655 }
656 if kind == "fs" {
657 let effect_kind = match op {
658 "exists" | "is_file" | "is_dir" | "stat"
659 | "list_dir" | "walk" | "glob" => "fs_walk",
660 "mkdir_p" | "remove" => "fs_write",
661 "copy" => {
662 self.ensure_kind_allowed("fs_walk")?;
663 self.ensure_kind_allowed("fs_write")?;
664 return self.dispatch_fs(op, args);
665 }
666 other => return Err(format!("unsupported fs.{other}")),
667 };
668 self.ensure_kind_allowed(effect_kind)?;
669 return self.dispatch_fs(op, args);
670 }
671 if kind == "datetime" && op == "now" {
678 self.ensure_kind_allowed("time")?;
679 let now = chrono::Utc::now();
680 let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
681 return Ok(Value::Int(nanos));
682 }
683 if kind == "crypto" && op == "random" {
684 self.ensure_kind_allowed("random")?;
685 let n = expect_int(args.first())?;
686 if !(0..=1_048_576).contains(&n) {
687 return Err("crypto.random: n must be in 0..=1048576".into());
688 }
689 use rand::{rngs::OsRng, TryRngCore};
690 let mut buf = vec![0u8; n as usize];
691 OsRng.try_fill_bytes(&mut buf)
692 .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
693 return Ok(Value::Bytes(buf));
694 }
695 if kind == "agent" {
708 let effect_kind = match op {
709 "local_complete" => "llm_local",
710 "cloud_complete" => "llm_cloud",
711 "send_a2a" => "a2a",
712 "call_mcp" => "mcp",
713 other => return Err(format!("unsupported agent.{other}")),
714 };
715 self.ensure_kind_allowed(effect_kind)?;
716 return match op {
724 "call_mcp" => Ok(self.dispatch_call_mcp(args)),
725 "local_complete" => Ok(dispatch_llm_local(args)),
726 "cloud_complete" => Ok(dispatch_llm_cloud(args)),
727 _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
728 };
729 }
730 if kind == "http" && matches!(op, "send" | "get" | "post") {
731 self.ensure_kind_allowed("net")?;
732 return match op {
733 "send" => {
734 let req = expect_record(args.first())?;
735 Ok(http_send_record(self, req))
736 }
737 "get" => {
738 let url = expect_str(args.first())?.to_string();
739 self.ensure_host_allowed(&url)?;
740 Ok(http_send_simple("GET", &url, None, "", None))
741 }
742 "post" => {
743 let url = expect_str(args.first())?.to_string();
744 let body = expect_bytes(args.get(1))?.clone();
745 let content_type = expect_str(args.get(2))?.to_string();
746 self.ensure_host_allowed(&url)?;
747 Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
748 }
749 _ => unreachable!(),
750 };
751 }
752 self.ensure_kind_allowed(kind)?;
753 match (kind, op) {
754 ("io", "print") => {
755 let line = expect_str(args.first())?;
756 self.sink.print_line(line);
757 Ok(Value::Unit)
758 }
759 ("io", "read") => {
760 let path = expect_str(args.first())?.to_string();
761 let resolved = self.resolve_read_path(&path);
762 if !self.policy.allow_fs_read.is_empty()
769 && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
770 {
771 return Err(format!("read of `{path}` outside --allow-fs-read"));
772 }
773 match std::fs::read_to_string(&resolved) {
774 Ok(s) => Ok(ok(Value::Str(s))),
775 Err(e) => Ok(err(Value::Str(format!("{e}")))),
776 }
777 }
778 ("io", "write") => {
779 let path = expect_str(args.first())?.to_string();
780 let contents = expect_str(args.get(1))?.to_string();
781 if !self.policy.allow_fs_write.is_empty() {
783 let p = std::path::Path::new(&path);
784 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
785 return Err(format!("write to `{path}` outside --allow-fs-write"));
786 }
787 }
788 match std::fs::write(&path, contents) {
789 Ok(_) => Ok(ok(Value::Unit)),
790 Err(e) => Ok(err(Value::Str(format!("{e}")))),
791 }
792 }
793 ("time", "now") => {
794 let secs = SystemTime::now().duration_since(UNIX_EPOCH)
795 .map_err(|e| format!("time: {e}"))?.as_secs();
796 Ok(Value::Int(secs as i64))
797 }
798 ("time", "sleep_ms") => {
799 let n = expect_int(args.first())?;
807 if n > 0 {
808 let ms = (n as u64).min(60_000);
809 std::thread::sleep(std::time::Duration::from_millis(ms));
810 }
811 Ok(Value::Unit)
812 }
813 ("rand", "int_in") => {
814 let lo = expect_int(args.first())?;
816 let hi = expect_int(args.get(1))?;
817 Ok(Value::Int((lo + hi) / 2))
818 }
819 ("env", "get") => {
824 let name = expect_str(args.first())?;
825 Ok(match std::env::var(name) {
826 Ok(v) => Value::Variant {
827 name: "Some".into(),
828 args: vec![Value::Str(v)],
829 },
830 Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
831 })
832 }
833 ("budget", _) => {
834 Ok(Value::Unit)
837 }
838 ("net", "get") => {
839 let url = expect_str(args.first())?.to_string();
840 self.ensure_host_allowed(&url)?;
841 Ok(http_request("GET", &url, None))
842 }
843 ("net", "post") => {
844 let url = expect_str(args.first())?.to_string();
845 let body = expect_str(args.get(1))?.to_string();
846 self.ensure_host_allowed(&url)?;
847 Ok(http_request("POST", &url, Some(&body)))
848 }
849 ("net", "serve") => {
850 let port = match args.first() {
851 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
852 _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
853 };
854 let handler_name = expect_str(args.get(1))?.to_string();
855 let program = self.program.clone()
856 .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
857 let policy = self.policy.clone();
858 serve_http(port, handler_name, program, policy, None)
859 }
860 ("net", "serve_tls") => {
861 let port = match args.first() {
862 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
863 _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
864 };
865 let cert_path = expect_str(args.get(1))?.to_string();
866 let key_path = expect_str(args.get(2))?.to_string();
867 let handler_name = expect_str(args.get(3))?.to_string();
868 let program = self.program.clone()
869 .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
870 let policy = self.policy.clone();
871 let cert = std::fs::read(&cert_path)
872 .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
873 let key = std::fs::read(&key_path)
874 .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
875 serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
876 }
877 ("net", "serve_ws") => {
878 let port = match args.first() {
879 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
880 _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
881 };
882 let handler_name = expect_str(args.get(1))?.to_string();
883 let program = self.program.clone()
884 .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
885 let policy = self.policy.clone();
886 let registry = Arc::new(crate::ws::ChatRegistry::default());
887 crate::ws::serve_ws(port, handler_name, program, policy, registry)
888 }
889 ("chat", "broadcast") => {
890 let registry = self.chat_registry.as_ref()
891 .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
892 let room = expect_str(args.first())?;
893 let body = expect_str(args.get(1))?;
894 crate::ws::chat_broadcast(registry, room, body);
895 Ok(Value::Unit)
896 }
897 ("chat", "send") => {
898 let registry = self.chat_registry.as_ref()
899 .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
900 let conn_id = match args.first() {
901 Some(Value::Int(n)) if *n >= 0 => *n as u64,
902 _ => return Err("chat.send: conn_id must be non-negative Int".into()),
903 };
904 let body = expect_str(args.get(1))?;
905 Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
906 }
907 ("kv", "open") => {
908 let path = expect_str(args.first())?.to_string();
909 if !self.policy.allow_fs_write.is_empty() {
913 let p = std::path::Path::new(&path);
914 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
915 return Ok(err(Value::Str(format!(
916 "kv.open: `{path}` outside --allow-fs-write"))));
917 }
918 }
919 match sled::open(&path) {
920 Ok(db) => {
921 let handle = next_kv_handle();
922 kv_registry().lock().unwrap().insert(handle, db);
923 Ok(ok(Value::Int(handle as i64)))
924 }
925 Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
926 }
927 }
928 ("kv", "close") => {
929 let h = expect_kv_handle(args.first())?;
930 kv_registry().lock().unwrap().remove(h);
931 Ok(Value::Unit)
932 }
933 ("kv", "get") => {
934 let h = expect_kv_handle(args.first())?;
935 let key = expect_str(args.get(1))?;
936 let mut reg = kv_registry().lock().unwrap();
937 let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
938 match db.get(key.as_bytes()) {
939 Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
940 Ok(None) => Ok(none()),
941 Err(e) => Err(format!("kv.get: {e}")),
942 }
943 }
944 ("kv", "put") => {
945 let h = expect_kv_handle(args.first())?;
946 let key = expect_str(args.get(1))?.to_string();
947 let val = expect_bytes(args.get(2))?.clone();
948 let mut reg = kv_registry().lock().unwrap();
949 let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
950 match db.insert(key.as_bytes(), val) {
951 Ok(_) => Ok(ok(Value::Unit)),
952 Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
953 }
954 }
955 ("kv", "delete") => {
956 let h = expect_kv_handle(args.first())?;
957 let key = expect_str(args.get(1))?;
958 let mut reg = kv_registry().lock().unwrap();
959 let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
960 match db.remove(key.as_bytes()) {
961 Ok(_) => Ok(ok(Value::Unit)),
962 Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
963 }
964 }
965 ("kv", "contains") => {
966 let h = expect_kv_handle(args.first())?;
967 let key = expect_str(args.get(1))?;
968 let mut reg = kv_registry().lock().unwrap();
969 let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
970 match db.contains_key(key.as_bytes()) {
971 Ok(present) => Ok(Value::Bool(present)),
972 Err(e) => Err(format!("kv.contains: {e}")),
973 }
974 }
975 ("kv", "list_prefix") => {
976 let h = expect_kv_handle(args.first())?;
977 let prefix = expect_str(args.get(1))?;
978 let mut reg = kv_registry().lock().unwrap();
979 let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
980 let mut keys: Vec<Value> = Vec::new();
981 for kv in db.scan_prefix(prefix.as_bytes()) {
982 let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
983 let s = String::from_utf8_lossy(&k).to_string();
984 keys.push(Value::Str(s));
985 }
986 Ok(Value::List(keys))
987 }
988 ("sql", "open") => {
989 let path = expect_str(args.first())?.to_string();
990 if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
994 let p = std::path::Path::new(&path);
995 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
996 return Ok(err(Value::Str(format!(
997 "sql.open: `{path}` outside --allow-fs-write"))));
998 }
999 }
1000 match rusqlite::Connection::open(&path) {
1001 Ok(conn) => {
1002 let handle = next_sql_handle();
1003 sql_registry().lock().unwrap().insert(handle, conn);
1004 Ok(ok(Value::Int(handle as i64)))
1005 }
1006 Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1007 }
1008 }
1009 ("sql", "close") => {
1010 let h = expect_sql_handle(args.first())?;
1011 sql_registry().lock().unwrap().remove(h);
1012 Ok(Value::Unit)
1013 }
1014 ("sql", "exec") => {
1015 let h = expect_sql_handle(args.first())?;
1016 let stmt = expect_str(args.get(1))?.to_string();
1017 let params = expect_str_list(args.get(2))?;
1018 let arc = sql_registry().lock().unwrap()
1019 .touch_get(h)
1020 .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1021 let conn = arc.lock().unwrap();
1022 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1023 .map(|s| s as &dyn rusqlite::ToSql)
1024 .collect();
1025 match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1026 Ok(n) => Ok(ok(Value::Int(n as i64))),
1027 Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1028 }
1029 }
1030 ("sql", "query") => {
1031 let h = expect_sql_handle(args.first())?;
1032 let stmt_str = expect_str(args.get(1))?.to_string();
1033 let params = expect_str_list(args.get(2))?;
1034 let arc = sql_registry().lock().unwrap()
1035 .touch_get(h)
1036 .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1037 let conn = arc.lock().unwrap();
1038 Ok(sql_run_query(&conn, &stmt_str, ¶ms))
1039 }
1040 ("proc", "spawn") => {
1041 let cmd = expect_str(args.first())?.to_string();
1055 let raw_args = match args.get(1) {
1056 Some(Value::List(items)) => items,
1057 Some(other) => return Err(format!(
1058 "proc.spawn: args must be List[Str], got {other:?}")),
1059 None => return Err("proc.spawn: missing args list".into()),
1060 };
1061 let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1062 Value::Str(s) => Ok(s.clone()),
1063 other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1064 }).collect::<Result<Vec<_>, _>>()?;
1065
1066 if !self.policy.allow_proc.is_empty() {
1070 let basename = std::path::Path::new(&cmd)
1071 .file_name()
1072 .and_then(|s| s.to_str())
1073 .unwrap_or(&cmd);
1074 if !self.policy.allow_proc.iter().any(|a| a == basename) {
1075 return Ok(err(Value::Str(format!(
1076 "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1077 self.policy.allow_proc
1078 ))));
1079 }
1080 }
1081
1082 if str_args.len() > 1024 {
1085 return Ok(err(Value::Str(
1086 "proc.spawn: arg-count exceeds 1024".into())));
1087 }
1088 if str_args.iter().any(|a| a.len() > 65_536) {
1089 return Ok(err(Value::Str(
1090 "proc.spawn: per-arg length exceeds 64 KiB".into())));
1091 }
1092
1093 let output = std::process::Command::new(&cmd)
1094 .args(&str_args)
1095 .output();
1096 match output {
1097 Ok(o) => {
1098 let mut rec = indexmap::IndexMap::new();
1099 rec.insert("stdout".into(), Value::Str(
1100 String::from_utf8_lossy(&o.stdout).to_string()));
1101 rec.insert("stderr".into(), Value::Str(
1102 String::from_utf8_lossy(&o.stderr).to_string()));
1103 rec.insert("exit_code".into(), Value::Int(
1104 o.status.code().unwrap_or(-1) as i64));
1105 Ok(ok(Value::Record(rec)))
1106 }
1107 Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1108 }
1109 }
1110 other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1111 }
1112 }
1113}
1114
1115pub struct TlsConfig {
1125 pub cert: Vec<u8>,
1126 pub key: Vec<u8>,
1127}
1128
1129fn serve_http(
1130 port: u16,
1131 handler_name: String,
1132 program: Arc<Program>,
1133 policy: Policy,
1134 tls: Option<TlsConfig>,
1135) -> Result<Value, String> {
1136 let (server, scheme) = match tls {
1137 None => (
1138 tiny_http::Server::http(("127.0.0.1", port))
1139 .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1140 "http",
1141 ),
1142 Some(cfg) => {
1143 let ssl = tiny_http::SslConfig {
1144 certificate: cfg.cert,
1145 private_key: cfg.key,
1146 };
1147 (
1148 tiny_http::Server::https(("127.0.0.1", port), ssl)
1149 .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1150 "https",
1151 )
1152 }
1153 };
1154 eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1155 for req in server.incoming_requests() {
1161 let program = Arc::clone(&program);
1162 let policy = policy.clone();
1163 let handler_name = handler_name.clone();
1164 std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1165 }
1166 Ok(Value::Unit)
1167}
1168
1169fn handle_request(
1170 mut req: tiny_http::Request,
1171 program: Arc<Program>,
1172 policy: Policy,
1173 handler_name: String,
1174) {
1175 let lex_req = build_request_value(&mut req);
1176 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1177 let mut vm = Vm::with_handler(&program, Box::new(handler));
1178 match vm.call(&handler_name, vec![lex_req]) {
1179 Ok(resp) => {
1180 let (status, body) = unpack_response(&resp);
1181 let response = tiny_http::Response::from_string(body).with_status_code(status);
1182 let _ = req.respond(response);
1183 }
1184 Err(e) => {
1185 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1186 .with_status_code(500);
1187 let _ = req.respond(response);
1188 }
1189 }
1190}
1191
1192fn build_request_value(req: &mut tiny_http::Request) -> Value {
1193 let method = format!("{:?}", req.method()).to_uppercase();
1194 let url = req.url().to_string();
1195 let (path, query) = match url.split_once('?') {
1196 Some((p, q)) => (p.to_string(), q.to_string()),
1197 None => (url, String::new()),
1198 };
1199 let mut body = String::new();
1200 let _ = req.as_reader().read_to_string(&mut body);
1201 let mut rec = indexmap::IndexMap::new();
1202 rec.insert("method".into(), Value::Str(method));
1203 rec.insert("path".into(), Value::Str(path));
1204 rec.insert("query".into(), Value::Str(query));
1205 rec.insert("body".into(), Value::Str(body));
1206 Value::Record(rec)
1207}
1208
1209fn unpack_response(v: &Value) -> (u16, String) {
1210 if let Value::Record(rec) = v {
1211 let status = rec.get("status").and_then(|s| match s {
1212 Value::Int(n) => Some(*n as u16),
1213 _ => None,
1214 }).unwrap_or(200);
1215 let body = rec.get("body").and_then(|b| match b {
1216 Value::Str(s) => Some(s.clone()),
1217 _ => None,
1218 }).unwrap_or_default();
1219 return (status, body);
1220 }
1221 (500, format!("handler returned non-record: {v:?}"))
1222}
1223
1224fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1230 use std::time::Duration;
1231 let agent: ureq::Agent = ureq::Agent::config_builder()
1236 .timeout_connect(Some(Duration::from_secs(10)))
1237 .timeout_recv_body(Some(Duration::from_secs(30)))
1238 .timeout_send_body(Some(Duration::from_secs(10)))
1239 .http_status_as_error(false)
1240 .build()
1241 .into();
1242 let resp = match (method, body) {
1243 ("GET", _) => agent.get(url).call(),
1244 ("POST", Some(b)) => agent.post(url).send(b),
1245 ("POST", None) => agent.post(url).send(""),
1246 (m, _) => return err_value(format!("unsupported method: {m}")),
1247 };
1248 match resp {
1249 Ok(mut r) => {
1250 let status = r.status().as_u16();
1251 let body = r.body_mut().read_to_string().unwrap_or_default();
1252 if (200..300).contains(&status) {
1253 Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1254 } else {
1255 err_value(format!("status {status}: {body}"))
1256 }
1257 }
1258 Err(e) => err_value(format!("transport: {e}")),
1259 }
1260}
1261
1262fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1267 use std::time::Duration;
1268 let mut b = ureq::Agent::config_builder()
1269 .timeout_connect(Some(Duration::from_secs(10)))
1270 .timeout_recv_body(Some(Duration::from_secs(30)))
1271 .timeout_send_body(Some(Duration::from_secs(10)))
1272 .http_status_as_error(false);
1273 if let Some(ms) = timeout_ms {
1274 let d = Duration::from_millis(ms);
1275 b = b.timeout_global(Some(d));
1276 }
1277 b.build().into()
1278}
1279
1280fn http_error_value(e: ureq::Error) -> Value {
1284 let (ctor, payload): (&str, Option<String>) = match &e {
1285 ureq::Error::Timeout(_) => ("TimeoutError", None),
1286 ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1287 ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1288 ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1289 _ => ("NetworkError", Some(format!("{e}"))),
1290 };
1291 let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1292 let inner = Value::Variant { name: ctor.into(), args };
1293 Value::Variant { name: "Err".into(), args: vec![inner] }
1294}
1295
1296fn http_decode_err(msg: String) -> Value {
1297 let inner = Value::Variant {
1298 name: "DecodeError".into(),
1299 args: vec![Value::Str(msg)],
1300 };
1301 Value::Variant { name: "Err".into(), args: vec![inner] }
1302}
1303
1304fn http_send_simple(
1309 method: &str,
1310 url: &str,
1311 body: Option<Vec<u8>>,
1312 content_type: &str,
1313 timeout_ms: Option<u64>,
1314) -> Value {
1315 http_send_full(method, url, body, content_type, &[], timeout_ms)
1316}
1317
1318fn http_send_full(
1319 method: &str,
1320 url: &str,
1321 body: Option<Vec<u8>>,
1322 content_type: &str,
1323 headers: &[(String, String)],
1324 timeout_ms: Option<u64>,
1325) -> Value {
1326 let agent = http_agent(timeout_ms);
1327 let resp = match method {
1328 "GET" => {
1329 let mut req = agent.get(url);
1330 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1331 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1332 req.call()
1333 }
1334 "POST" => {
1335 let body = body.unwrap_or_default();
1336 let mut req = agent.post(url);
1337 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1338 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1339 req.send(&body[..])
1340 }
1341 m => {
1342 return http_decode_err(format!("unsupported method: {m}"));
1346 }
1347 };
1348 match resp {
1349 Ok(mut r) => {
1350 let status = r.status().as_u16() as i64;
1351 let headers_map = collect_response_headers(r.headers());
1352 let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1353 Ok(b) => b,
1354 Err(e) => return http_decode_err(format!("body read: {e}")),
1355 };
1356 let mut rec = indexmap::IndexMap::new();
1357 rec.insert("status".into(), Value::Int(status));
1358 rec.insert("headers".into(), Value::Map(headers_map));
1359 rec.insert("body".into(), Value::Bytes(body_bytes));
1360 Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1361 }
1362 Err(e) => http_error_value(e),
1363 }
1364}
1365
1366fn collect_response_headers(
1367 headers: &ureq::http::HeaderMap,
1368) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1369 let mut out = std::collections::BTreeMap::new();
1370 for (name, value) in headers.iter() {
1371 let v = value.to_str().unwrap_or("").to_string();
1372 out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1373 }
1374 out
1375}
1376
1377fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1381 let method = match req.get("method") {
1382 Some(Value::Str(s)) => s.clone(),
1383 _ => return http_decode_err("HttpRequest.method must be Str".into()),
1384 };
1385 let url = match req.get("url") {
1386 Some(Value::Str(s)) => s.clone(),
1387 _ => return http_decode_err("HttpRequest.url must be Str".into()),
1388 };
1389 if let Err(e) = handler.ensure_host_allowed(&url) {
1390 return http_decode_err(e);
1391 }
1392 let body = match req.get("body") {
1393 Some(Value::Variant { name, args }) if name == "None" => None,
1394 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1395 [Value::Bytes(b)] => Some(b.clone()),
1396 _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1397 },
1398 _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1399 };
1400 let timeout_ms = match req.get("timeout_ms") {
1401 Some(Value::Variant { name, .. }) if name == "None" => None,
1402 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1403 [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1404 _ => return http_decode_err(
1405 "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1406 },
1407 _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1408 };
1409 let headers: Vec<(String, String)> = match req.get("headers") {
1410 Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1411 let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1412 let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1413 Some((kk, vv))
1414 }).collect(),
1415 _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1416 };
1417 http_send_full(&method, &url, body, "", &headers, timeout_ms)
1418}
1419
1420fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1421 match v {
1422 Some(Value::Record(r)) => Ok(r),
1423 Some(other) => Err(format!("expected Record, got {other:?}")),
1424 None => Err("missing Record argument".into()),
1425 }
1426}
1427
1428fn err_value(msg: String) -> Value {
1429 Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1430}
1431
1432fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1433 match v {
1434 Some(Value::Str(s)) => Ok(s),
1435 Some(other) => Err(format!("expected Str arg, got {other:?}")),
1436 None => Err("missing argument".into()),
1437 }
1438}
1439
1440fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1441 match v {
1442 Some(Value::Int(n)) => Ok(*n),
1443 Some(other) => Err(format!("expected Int arg, got {other:?}")),
1444 None => Err("missing argument".into()),
1445 }
1446}
1447
1448fn ok(v: Value) -> Value {
1449 Value::Variant { name: "Ok".into(), args: vec![v] }
1450}
1451fn err(v: Value) -> Value {
1452 Value::Variant { name: "Err".into(), args: vec![v] }
1453}
1454
1455impl DefaultHandler {
1456 fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1462 let server = match args.first() {
1463 Some(Value::Str(s)) => s.clone(),
1464 _ => return err(Value::Str(
1465 "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1466 };
1467 let tool = match args.get(1) {
1468 Some(Value::Str(s)) => s.clone(),
1469 _ => return err(Value::Str(
1470 "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1471 };
1472 let args_json = match args.get(2) {
1473 Some(Value::Str(s)) => s.clone(),
1474 _ => return err(Value::Str(
1475 "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1476 };
1477 let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1478 Ok(v) => v,
1479 Err(e) => return err(Value::Str(format!(
1480 "agent.call_mcp: args_json is not valid JSON: {e}"))),
1481 };
1482 match self.mcp_clients.call(&server, &tool, parsed) {
1483 Ok(result) => ok(Value::Str(
1484 serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1485 Err(e) => err(Value::Str(e)),
1486 }
1487 }
1488}
1489
1490fn dispatch_llm_local(args: Vec<Value>) -> Value {
1495 let prompt = match args.first() {
1496 Some(Value::Str(s)) => s.clone(),
1497 _ => return err(Value::Str(
1498 "agent.local_complete(prompt): prompt must be Str".into())),
1499 };
1500 match crate::llm::local_complete(&prompt) {
1501 Ok(text) => ok(Value::Str(text)),
1502 Err(e) => err(Value::Str(e)),
1503 }
1504}
1505
1506fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1513 let prompt = match args.first() {
1514 Some(Value::Str(s)) => s.clone(),
1515 _ => return err(Value::Str(
1516 "agent.cloud_complete(prompt): prompt must be Str".into())),
1517 };
1518 match crate::llm::cloud_complete(&prompt) {
1519 Ok(text) => ok(Value::Str(text)),
1520 Err(e) => err(Value::Str(e)),
1521 }
1522}
1523
1524fn some(v: Value) -> Value {
1525 Value::Variant { name: "Some".into(), args: vec![v] }
1526}
1527fn none() -> Value {
1528 Value::Variant { name: "None".into(), args: vec![] }
1529}
1530
1531fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1532 match v {
1533 Some(Value::Bytes(b)) => Ok(b),
1534 Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1535 None => Err("missing argument".into()),
1536 }
1537}
1538
1539fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1540 match v {
1541 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1542 Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1543 None => Err("missing Kv argument".into()),
1544 }
1545}
1546
1547fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1548 match v {
1549 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1550 Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1551 None => Err("missing Db argument".into()),
1552 }
1553}
1554
1555fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1556 match v {
1557 Some(Value::List(items)) => items.iter().map(|x| match x {
1558 Value::Str(s) => Ok(s.clone()),
1559 other => Err(format!("expected List[Str] element, got {other:?}")),
1560 }).collect(),
1561 Some(other) => Err(format!("expected List[Str], got {other:?}")),
1562 None => Err("missing List[Str] argument".into()),
1563 }
1564}
1565
1566fn sql_run_query(
1572 conn: &rusqlite::Connection,
1573 stmt_str: &str,
1574 params: &[String],
1575) -> Value {
1576 let mut stmt = match conn.prepare(stmt_str) {
1577 Ok(s) => s,
1578 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1579 };
1580 let column_count = stmt.column_count();
1581 let column_names: Vec<String> = (0..column_count)
1582 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1583 .collect();
1584 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1585 .map(|s| s as &dyn rusqlite::ToSql)
1586 .collect();
1587 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1588 Ok(r) => r,
1589 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1590 };
1591 let mut out: Vec<Value> = Vec::new();
1592 loop {
1593 let row = match rows.next() {
1594 Ok(Some(r)) => r,
1595 Ok(None) => break,
1596 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1597 };
1598 let mut rec = indexmap::IndexMap::new();
1599 for (i, name) in column_names.iter().enumerate() {
1600 let cell = match row.get_ref(i) {
1601 Ok(c) => sql_value_ref_to_lex(c),
1602 Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1603 };
1604 rec.insert(name.clone(), cell);
1605 }
1606 out.push(Value::Record(rec));
1607 }
1608 ok(Value::List(out))
1609}
1610
1611fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1612 use rusqlite::types::ValueRef;
1613 match v {
1614 ValueRef::Null => Value::Unit,
1615 ValueRef::Integer(n) => Value::Int(n),
1616 ValueRef::Real(f) => Value::Float(f),
1617 ValueRef::Text(s) => Value::Str(String::from_utf8_lossy(s).into_owned()),
1618 ValueRef::Blob(b) => Value::Bytes(b.to_vec()),
1619 }
1620}
1621
1622#[derive(Clone, Copy, PartialEq, PartialOrd)]
1625enum LogLevel { Debug, Info, Warn, Error }
1626
1627#[derive(Clone, Copy, PartialEq)]
1628enum LogFormat { Text, Json }
1629
1630#[derive(Clone)]
1631enum LogSink {
1632 Stderr,
1633 File(std::sync::Arc<Mutex<std::fs::File>>),
1634}
1635
1636struct LogState {
1637 level: LogLevel,
1638 format: LogFormat,
1639 sink: LogSink,
1640}
1641
1642fn log_state() -> &'static Mutex<LogState> {
1643 static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1644 STATE.get_or_init(|| Mutex::new(LogState {
1645 level: LogLevel::Info,
1646 format: LogFormat::Text,
1647 sink: LogSink::Stderr,
1648 }))
1649}
1650
1651fn parse_log_level(s: &str) -> Option<LogLevel> {
1652 match s {
1653 "debug" => Some(LogLevel::Debug),
1654 "info" => Some(LogLevel::Info),
1655 "warn" => Some(LogLevel::Warn),
1656 "error" => Some(LogLevel::Error),
1657 _ => None,
1658 }
1659}
1660
1661fn level_label(l: LogLevel) -> &'static str {
1662 match l {
1663 LogLevel::Debug => "debug",
1664 LogLevel::Info => "info",
1665 LogLevel::Warn => "warn",
1666 LogLevel::Error => "error",
1667 }
1668}
1669
1670fn emit_log(level: LogLevel, msg: &str) {
1671 let state = log_state().lock().unwrap();
1672 if level < state.level {
1673 return;
1674 }
1675 let ts = chrono::Utc::now().to_rfc3339();
1676 let line = match state.format {
1677 LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1678 LogFormat::Json => {
1679 let escaped = msg
1683 .replace('\\', "\\\\")
1684 .replace('"', "\\\"")
1685 .replace('\n', "\\n")
1686 .replace('\r', "\\r");
1687 format!(
1688 "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1689 level_label(level),
1690 )
1691 }
1692 };
1693 let sink = state.sink.clone();
1694 drop(state);
1695 match sink {
1696 LogSink::Stderr => {
1697 use std::io::Write;
1698 let _ = std::io::stderr().write_all(line.as_bytes());
1699 }
1700 LogSink::File(f) => {
1701 use std::io::Write;
1702 if let Ok(mut g) = f.lock() {
1703 let _ = g.write_all(line.as_bytes());
1704 }
1705 }
1706 }
1707}
1708
1709pub(crate) struct ProcessState {
1710 child: std::process::Child,
1711 stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1712 stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1713}
1714
1715fn process_registry() -> &'static Mutex<ProcessRegistry> {
1729 static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1730 REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1731}
1732
1733const MAX_PROCESS_HANDLES: usize = 256;
1734
1735type SharedProcessState = Arc<Mutex<ProcessState>>;
1736
1737pub(crate) struct ProcessRegistry {
1738 entries: indexmap::IndexMap<u64, SharedProcessState>,
1739 cap: usize,
1740}
1741
1742impl ProcessRegistry {
1743 pub(crate) fn with_capacity(cap: usize) -> Self {
1744 Self { entries: indexmap::IndexMap::new(), cap }
1745 }
1746
1747 pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1751 if self.entries.len() >= self.cap {
1752 self.entries.shift_remove_index(0);
1753 }
1754 self.entries.insert(handle, Arc::new(Mutex::new(state)));
1755 }
1756
1757 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1761 let idx = self.entries.get_index_of(&handle)?;
1762 self.entries.move_index(idx, self.entries.len() - 1);
1763 self.entries.get(&handle).cloned()
1764 }
1765
1766 pub(crate) fn remove(&mut self, handle: u64) {
1771 self.entries.shift_remove(&handle);
1772 }
1773
1774 #[cfg(test)]
1775 pub(crate) fn len(&self) -> usize { self.entries.len() }
1776}
1777
1778fn next_process_handle() -> u64 {
1779 static COUNTER: AtomicU64 = AtomicU64::new(1);
1780 COUNTER.fetch_add(1, Ordering::SeqCst)
1781}
1782
1783#[cfg(all(test, unix))]
1784mod process_registry_tests {
1785 use super::{ProcessRegistry, ProcessState};
1786
1787 fn fresh_state() -> ProcessState {
1791 let child = std::process::Command::new("true")
1792 .stdout(std::process::Stdio::null())
1793 .stderr(std::process::Stdio::null())
1794 .spawn()
1795 .expect("spawn `true`");
1796 ProcessState { child, stdout: None, stderr: None }
1797 }
1798
1799 #[test]
1800 fn insert_and_get_round_trip() {
1801 let mut r = ProcessRegistry::with_capacity(4);
1802 r.insert(1, fresh_state());
1803 assert!(r.touch_get(1).is_some());
1804 assert!(r.touch_get(2).is_none());
1805 }
1806
1807 #[test]
1808 fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1809 let mut r = ProcessRegistry::with_capacity(4);
1810 r.insert(1, fresh_state());
1811 r.insert(2, fresh_state());
1812 let a = r.touch_get(1).unwrap();
1813 let b = r.touch_get(2).unwrap();
1814 assert!(!std::sync::Arc::ptr_eq(&a, &b));
1816 }
1817
1818 #[test]
1819 fn cap_evicts_lru_on_overflow() {
1820 let mut r = ProcessRegistry::with_capacity(2);
1821 r.insert(1, fresh_state());
1822 r.insert(2, fresh_state());
1823 let _ = r.touch_get(1);
1824 r.insert(3, fresh_state());
1825 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1826 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1827 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1828 assert_eq!(r.len(), 2);
1829 }
1830
1831 #[test]
1832 fn cap_with_no_touches_evicts_in_insertion_order() {
1833 let mut r = ProcessRegistry::with_capacity(2);
1834 r.insert(10, fresh_state());
1835 r.insert(20, fresh_state());
1836 r.insert(30, fresh_state());
1837 assert!(r.touch_get(10).is_none());
1838 assert!(r.touch_get(20).is_some());
1839 assert!(r.touch_get(30).is_some());
1840 }
1841
1842 #[test]
1843 fn remove_drops_entry() {
1844 let mut r = ProcessRegistry::with_capacity(4);
1845 r.insert(1, fresh_state());
1846 r.remove(1);
1847 assert!(r.touch_get(1).is_none());
1848 assert_eq!(r.len(), 0);
1849 }
1850
1851 #[test]
1852 fn many_inserts_stay_bounded_at_cap() {
1853 let cap = 8;
1854 let mut r = ProcessRegistry::with_capacity(cap);
1855 for i in 0..(cap as u64 * 3) {
1856 r.insert(i, fresh_state());
1857 assert!(r.len() <= cap);
1858 }
1859 assert_eq!(r.len(), cap);
1860 }
1861
1862 #[test]
1863 fn outstanding_arc_outlives_remove() {
1864 let mut r = ProcessRegistry::with_capacity(4);
1868 r.insert(1, fresh_state());
1869 let arc = r.touch_get(1).expect("entry exists");
1870 r.remove(1);
1871 assert!(r.touch_get(1).is_none());
1873 let _state = arc.lock().unwrap();
1874 }
1875}
1876
1877fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
1878 match v {
1879 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1880 Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
1881 None => Err("missing ProcessHandle argument".into()),
1882 }
1883}
1884
1885fn kv_registry() -> &'static Mutex<KvRegistry> {
1897 static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
1898 REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
1899}
1900
1901const MAX_KV_HANDLES: usize = 256;
1907
1908pub(crate) struct KvRegistry {
1913 entries: indexmap::IndexMap<u64, sled::Db>,
1914 cap: usize,
1915}
1916
1917impl KvRegistry {
1918 pub(crate) fn with_capacity(cap: usize) -> Self {
1919 Self { entries: indexmap::IndexMap::new(), cap }
1920 }
1921
1922 pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
1925 if self.entries.len() >= self.cap {
1926 self.entries.shift_remove_index(0);
1927 }
1928 self.entries.insert(handle, db);
1929 }
1930
1931 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
1933 let idx = self.entries.get_index_of(&handle)?;
1934 self.entries.move_index(idx, self.entries.len() - 1);
1935 self.entries.get(&handle)
1936 }
1937
1938 pub(crate) fn remove(&mut self, handle: u64) {
1940 self.entries.shift_remove(&handle);
1941 }
1942
1943 #[cfg(test)]
1944 pub(crate) fn len(&self) -> usize { self.entries.len() }
1945}
1946
1947fn next_kv_handle() -> u64 {
1948 static COUNTER: AtomicU64 = AtomicU64::new(1);
1949 COUNTER.fetch_add(1, Ordering::SeqCst)
1950}
1951
1952fn sql_registry() -> &'static Mutex<SqlRegistry> {
1959 static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
1960 REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
1961}
1962
1963const MAX_SQL_HANDLES: usize = 256;
1964
1965type SharedConn = Arc<Mutex<rusqlite::Connection>>;
1966
1967pub(crate) struct SqlRegistry {
1968 entries: indexmap::IndexMap<u64, SharedConn>,
1969 cap: usize,
1970}
1971
1972impl SqlRegistry {
1973 pub(crate) fn with_capacity(cap: usize) -> Self {
1974 Self { entries: indexmap::IndexMap::new(), cap }
1975 }
1976
1977 pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
1978 if self.entries.len() >= self.cap {
1979 self.entries.shift_remove_index(0);
1980 }
1981 self.entries.insert(handle, Arc::new(Mutex::new(conn)));
1982 }
1983
1984 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
1988 let idx = self.entries.get_index_of(&handle)?;
1989 self.entries.move_index(idx, self.entries.len() - 1);
1990 self.entries.get(&handle).cloned()
1991 }
1992
1993 pub(crate) fn remove(&mut self, handle: u64) {
1994 self.entries.shift_remove(&handle);
1995 }
1996
1997 #[cfg(test)]
1998 pub(crate) fn len(&self) -> usize { self.entries.len() }
1999}
2000
2001fn next_sql_handle() -> u64 {
2002 static COUNTER: AtomicU64 = AtomicU64::new(1);
2003 COUNTER.fetch_add(1, Ordering::SeqCst)
2004}
2005
2006#[cfg(test)]
2007mod sql_registry_tests {
2008 use super::SqlRegistry;
2009
2010 fn fresh() -> rusqlite::Connection {
2011 rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
2012 }
2013
2014 #[test]
2015 fn insert_and_get_round_trip() {
2016 let mut r = SqlRegistry::with_capacity(4);
2017 r.insert(1, fresh());
2018 assert!(r.touch_get(1).is_some());
2019 assert!(r.touch_get(2).is_none());
2020 }
2021
2022 #[test]
2023 fn cap_evicts_lru_on_overflow() {
2024 let mut r = SqlRegistry::with_capacity(2);
2025 r.insert(1, fresh());
2026 r.insert(2, fresh());
2027 let _ = r.touch_get(1);
2028 r.insert(3, fresh());
2029 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2030 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2031 assert!(r.touch_get(3).is_some(), "3 just inserted");
2032 assert_eq!(r.len(), 2);
2033 }
2034
2035 #[test]
2036 fn remove_drops_entry() {
2037 let mut r = SqlRegistry::with_capacity(4);
2038 r.insert(1, fresh());
2039 r.remove(1);
2040 assert!(r.touch_get(1).is_none());
2041 assert_eq!(r.len(), 0);
2042 }
2043
2044 #[test]
2045 fn many_inserts_stay_bounded_at_cap() {
2046 let cap = 8;
2047 let mut r = SqlRegistry::with_capacity(cap);
2048 for i in 0..(cap as u64 * 3) {
2049 r.insert(i, fresh());
2050 assert!(r.len() <= cap);
2051 }
2052 assert_eq!(r.len(), cap);
2053 }
2054}
2055
2056#[cfg(test)]
2057mod kv_registry_tests {
2058 use super::KvRegistry;
2059
2060 fn fresh_db(tag: &str) -> sled::Db {
2063 let dir = std::env::temp_dir().join(format!(
2064 "lex-kv-reg-{}-{}-{}",
2065 std::process::id(),
2066 tag,
2067 std::time::SystemTime::now()
2068 .duration_since(std::time::UNIX_EPOCH)
2069 .unwrap()
2070 .as_nanos()
2071 ));
2072 sled::open(&dir).expect("sled open")
2073 }
2074
2075 #[test]
2076 fn insert_and_get_round_trip() {
2077 let mut r = KvRegistry::with_capacity(4);
2078 r.insert(1, fresh_db("a"));
2079 assert!(r.touch_get(1).is_some());
2080 assert!(r.touch_get(2).is_none());
2081 }
2082
2083 #[test]
2084 fn cap_evicts_lru_on_overflow() {
2085 let mut r = KvRegistry::with_capacity(2);
2087 r.insert(1, fresh_db("c1"));
2088 r.insert(2, fresh_db("c2"));
2089 let _ = r.touch_get(1);
2090 r.insert(3, fresh_db("c3"));
2091 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2092 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2093 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2094 assert_eq!(r.len(), 2);
2095 }
2096
2097 #[test]
2098 fn cap_with_no_touches_evicts_in_insertion_order() {
2099 let mut r = KvRegistry::with_capacity(2);
2101 r.insert(10, fresh_db("f1"));
2102 r.insert(20, fresh_db("f2"));
2103 r.insert(30, fresh_db("f3"));
2104 assert!(r.touch_get(10).is_none());
2105 assert!(r.touch_get(20).is_some());
2106 assert!(r.touch_get(30).is_some());
2107 }
2108
2109 #[test]
2110 fn remove_drops_entry() {
2111 let mut r = KvRegistry::with_capacity(4);
2112 r.insert(1, fresh_db("r1"));
2113 r.remove(1);
2114 assert!(r.touch_get(1).is_none());
2115 assert_eq!(r.len(), 0);
2116 }
2117
2118 #[test]
2119 fn remove_unknown_handle_is_noop() {
2120 let mut r = KvRegistry::with_capacity(4);
2121 r.insert(1, fresh_db("u1"));
2122 r.remove(999);
2123 assert!(r.touch_get(1).is_some());
2124 }
2125
2126 #[test]
2127 fn many_inserts_stay_bounded_at_cap() {
2128 let cap = 8;
2131 let mut r = KvRegistry::with_capacity(cap);
2132 for i in 0..(cap as u64 * 3) {
2133 r.insert(i, fresh_db(&format!("b{i}")));
2134 assert!(r.len() <= cap);
2135 }
2136 assert_eq!(r.len(), cap);
2137 }
2138}