Skip to main content

lex_runtime/
handler.rs

1//! Native effect handlers, dispatched at runtime through the VM's
2//! `EffectHandler` trait. The handler also re-checks the runtime policy
3//! per spec §7.4 (the static check is necessary but not sufficient: a fn
4//! declared `[fs_read("/data")]` that's allowed at startup still has to
5//! pass the path check at the point of dispatch).
6
7use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::cell::RefCell;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Mutex, OnceLock};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::builtins::try_pure_builtin;
17use crate::policy::Policy;
18
19/// Output sink used by `io.print`. Tests inject a buffer; production prints
20/// to stdout.
21pub trait IoSink: Send {
22    fn print_line(&mut self, s: &str);
23}
24
25pub struct StdoutSink;
26impl IoSink for StdoutSink {
27    fn print_line(&mut self, s: &str) {
28        println!("{s}");
29    }
30}
31
32#[derive(Default)]
33pub struct CapturedSink { pub lines: Vec<String> }
34impl IoSink for CapturedSink {
35    fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
36}
37
38pub struct DefaultHandler {
39    policy: Policy,
40    pub sink: Box<dyn IoSink>,
41    /// Optional read root for `io.read` — when set, `io.read("p")` resolves
42    /// to `read_root.join(p)`. Lets tests run without touching the real fs.
43    pub read_root: Option<PathBuf>,
44    /// Captured budget consumption (post-static-check, observability only).
45    pub budget_used: RefCell<u64>,
46    /// Shared reference to the program, needed by `net.serve` so the
47    /// handler can spin up fresh VMs to dispatch incoming requests.
48    /// `None` if the handler was constructed without a program.
49    pub program: Option<Arc<Program>>,
50    /// Chat registry; populated by `net.serve_ws`'s per-message
51    /// dispatch so `chat.broadcast` / `chat.send` work from inside
52    /// a handler invocation.
53    pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
54}
55
56impl DefaultHandler {
57    pub fn new(policy: Policy) -> Self {
58        Self {
59            policy,
60            sink: Box::new(StdoutSink),
61            read_root: None,
62            budget_used: RefCell::new(0),
63            program: None,
64            chat_registry: None,
65        }
66    }
67
68    pub fn with_program(mut self, program: Arc<Program>) -> Self {
69        self.program = Some(program); self
70    }
71
72    pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
73        self.chat_registry = Some(registry); self
74    }
75
76    pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
77        self.sink = sink; self
78    }
79
80    pub fn with_read_root(mut self, root: PathBuf) -> Self {
81        self.read_root = Some(root); self
82    }
83
84    fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
85        if self.policy.allow_effects.contains(kind) {
86            Ok(())
87        } else {
88            Err(format!("effect `{kind}` not in --allow-effects"))
89        }
90    }
91
92    fn resolve_read_path(&self, p: &str) -> PathBuf {
93        match &self.read_root {
94            Some(root) => root.join(p.trim_start_matches('/')),
95            None => PathBuf::from(p),
96        }
97    }
98
99    fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
100        match op {
101            "debug" | "info" | "warn" | "error" => {
102                let msg = expect_str(args.first())?;
103                let level = match op {
104                    "debug" => LogLevel::Debug,
105                    "info" => LogLevel::Info,
106                    "warn" => LogLevel::Warn,
107                    _ => LogLevel::Error,
108                };
109                emit_log(level, msg);
110                Ok(Value::Unit)
111            }
112            "set_level" => {
113                let s = expect_str(args.first())?;
114                match parse_log_level(s) {
115                    Some(l) => {
116                        log_state().lock().unwrap().level = l;
117                        Ok(ok(Value::Unit))
118                    }
119                    None => Ok(err(Value::Str(format!(
120                        "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
121                }
122            }
123            "set_format" => {
124                let s = expect_str(args.first())?;
125                let fmt = match s {
126                    "text" => LogFormat::Text,
127                    "json" => LogFormat::Json,
128                    other => return Ok(err(Value::Str(format!(
129                        "log.set_format: unknown format `{other}`; expected text|json")))),
130                };
131                log_state().lock().unwrap().format = fmt;
132                Ok(ok(Value::Unit))
133            }
134            "set_sink" => {
135                let path = expect_str(args.first())?;
136                if path == "-" {
137                    log_state().lock().unwrap().sink = LogSink::Stderr;
138                    return Ok(ok(Value::Unit));
139                }
140                if let Err(e) = self.ensure_fs_write_path(path) {
141                    return Ok(err(Value::Str(e)));
142                }
143                match std::fs::OpenOptions::new()
144                    .create(true).append(true).open(path)
145                {
146                    Ok(f) => {
147                        log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
148                        Ok(ok(Value::Unit))
149                    }
150                    Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
151                }
152            }
153            other => Err(format!("unsupported log.{other}")),
154        }
155    }
156
157    fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
158        match op {
159            "spawn" => {
160                let cmd = expect_str(args.first())?.to_string();
161                let raw_args = match args.get(1) {
162                    Some(Value::List(items)) => items.clone(),
163                    _ => return Err("process.spawn: args must be List[Str]".into()),
164                };
165                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
166                    Value::Str(s) => Ok(s.clone()),
167                    other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
168                }).collect();
169                let str_args = str_args?;
170                let opts = match args.get(2) {
171                    Some(Value::Record(r)) => r.clone(),
172                    _ => return Err("process.spawn: missing or invalid opts record".into()),
173                };
174
175                // Allow-list check, mirroring the existing proc.spawn.
176                if !self.policy.allow_proc.is_empty() {
177                    let basename = std::path::Path::new(&cmd)
178                        .file_name()
179                        .and_then(|s| s.to_str())
180                        .unwrap_or(&cmd);
181                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
182                        return Ok(err(Value::Str(format!(
183                            "process.spawn: `{cmd}` not in --allow-proc {:?}",
184                            self.policy.allow_proc
185                        ))));
186                    }
187                }
188
189                let mut command = std::process::Command::new(&cmd);
190                command.args(&str_args);
191                command.stdin(std::process::Stdio::piped());
192                command.stdout(std::process::Stdio::piped());
193                command.stderr(std::process::Stdio::piped());
194
195                if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
196                    if name == "Some" {
197                        if let Some(Value::Str(s)) = vargs.first() {
198                            command.current_dir(s);
199                        }
200                    }
201                }
202                if let Some(Value::Map(env)) = opts.get("env") {
203                    for (k, v) in env {
204                        if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
205                            command.env(ks, vs);
206                        }
207                    }
208                }
209
210                let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
211                    Some(Value::Variant { name, args: vargs }) if name == "Some" => {
212                        match vargs.first() {
213                            Some(Value::Bytes(b)) => Some(b.clone()),
214                            _ => None,
215                        }
216                    }
217                    _ => None,
218                };
219
220                let mut child = match command.spawn() {
221                    Ok(c) => c,
222                    Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
223                };
224
225                if let Some(payload) = stdin_payload {
226                    if let Some(mut stdin) = child.stdin.take() {
227                        use std::io::Write;
228                        let _ = stdin.write_all(&payload);
229                        // Drop closes stdin; the child sees EOF.
230                    }
231                }
232
233                let stdout = child.stdout.take().map(std::io::BufReader::new);
234                let stderr = child.stderr.take().map(std::io::BufReader::new);
235                let handle = next_process_handle();
236                process_registry().lock().unwrap().insert(handle, ProcessState {
237                    child,
238                    stdout,
239                    stderr,
240                });
241                Ok(ok(Value::Int(handle as i64)))
242            }
243            "read_stdout_line" => Self::read_line_op(args, true),
244            "read_stderr_line" => Self::read_line_op(args, false),
245            "wait" => {
246                let h = expect_process_handle(args.first())?;
247                // Look up the per-handle Arc, then release the global
248                // lock before the (slow) wait so unrelated handles
249                // can dispatch concurrently.
250                let arc = process_registry().lock().unwrap()
251                    .touch_get(h)
252                    .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
253                let status = {
254                    let mut state = arc.lock().unwrap();
255                    state.child.wait().map_err(|e| format!("process.wait: {e}"))?
256                };
257                // Wait completion makes the handle terminal; drop it
258                // from the registry so the cap doesn't fill up with
259                // exited children.
260                process_registry().lock().unwrap().remove(h);
261                let mut rec = indexmap::IndexMap::new();
262                rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
263                #[cfg(unix)]
264                {
265                    use std::os::unix::process::ExitStatusExt;
266                    rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
267                }
268                #[cfg(not(unix))]
269                {
270                    rec.insert("signaled".into(), Value::Bool(false));
271                }
272                Ok(Value::Record(rec))
273            }
274            "kill" => {
275                let h = expect_process_handle(args.first())?;
276                let _signal = expect_str(args.get(1))?;
277                let arc = process_registry().lock().unwrap()
278                    .touch_get(h)
279                    .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
280                let mut state = arc.lock().unwrap();
281                // Cross-platform: only `kill` (SIGKILL-equivalent on
282                // Windows). Signal-name dispatch is a v1.5 follow-up.
283                match state.child.kill() {
284                    Ok(_) => Ok(ok(Value::Unit)),
285                    Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
286                }
287            }
288            "run" => {
289                let cmd = expect_str(args.first())?.to_string();
290                let raw_args = match args.get(1) {
291                    Some(Value::List(items)) => items.clone(),
292                    _ => return Err("process.run: args must be List[Str]".into()),
293                };
294                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
295                    Value::Str(s) => Ok(s.clone()),
296                    other => Err(format!("process.run: arg must be Str, got {other:?}")),
297                }).collect();
298                let str_args = str_args?;
299                if !self.policy.allow_proc.is_empty() {
300                    let basename = std::path::Path::new(&cmd)
301                        .file_name()
302                        .and_then(|s| s.to_str())
303                        .unwrap_or(&cmd);
304                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
305                        return Ok(err(Value::Str(format!(
306                            "process.run: `{cmd}` not in --allow-proc {:?}",
307                            self.policy.allow_proc
308                        ))));
309                    }
310                }
311                match std::process::Command::new(&cmd).args(&str_args).output() {
312                    Ok(o) => {
313                        let mut rec = indexmap::IndexMap::new();
314                        rec.insert("stdout".into(), Value::Str(
315                            String::from_utf8_lossy(&o.stdout).to_string()));
316                        rec.insert("stderr".into(), Value::Str(
317                            String::from_utf8_lossy(&o.stderr).to_string()));
318                        rec.insert("exit_code".into(), Value::Int(
319                            o.status.code().unwrap_or(-1) as i64));
320                        Ok(ok(Value::Record(rec)))
321                    }
322                    Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
323                }
324            }
325            other => Err(format!("unsupported process.{other}")),
326        }
327    }
328
329    /// Read one line from the child's stdout (`is_stdout = true`) or
330    /// stderr. Returns `None` (Lex `Option`) at EOF; subsequent calls
331    /// keep returning `None`. Holds only the per-handle mutex during
332    /// the (potentially blocking) read, so reads on one handle don't
333    /// block reads/waits on a different handle.
334    fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
335        let h = expect_process_handle(args.first())?;
336        let arc = process_registry().lock().unwrap()
337            .touch_get(h)
338            .ok_or_else(|| format!(
339                "process.read_{}_line: closed or unknown ProcessHandle",
340                if is_stdout { "stdout" } else { "stderr" }))?;
341        let mut state = arc.lock().unwrap();
342        let reader_opt = if is_stdout {
343            state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
344        } else {
345            state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
346        };
347        let reader = match reader_opt {
348            Some(r) => r,
349            None => return Ok(none()),
350        };
351        let mut line = String::new();
352        match reader.read_line(&mut line) {
353            Ok(0) => Ok(none()),
354            Ok(_) => {
355                if line.ends_with('\n') { line.pop(); }
356                if line.ends_with('\r') { line.pop(); }
357                Ok(some(Value::Str(line)))
358            }
359            Err(e) => Err(format!("process.read_*_line: {e}")),
360        }
361    }
362
363    fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
364        match op {
365            "exists" => {
366                let path = expect_str(args.first())?.to_string();
367                if let Err(e) = self.ensure_fs_walk_path(&path) {
368                    return Ok(err(Value::Str(e)));
369                }
370                Ok(Value::Bool(std::path::Path::new(&path).exists()))
371            }
372            "is_file" => {
373                let path = expect_str(args.first())?.to_string();
374                if let Err(e) = self.ensure_fs_walk_path(&path) {
375                    return Ok(err(Value::Str(e)));
376                }
377                Ok(Value::Bool(std::path::Path::new(&path).is_file()))
378            }
379            "is_dir" => {
380                let path = expect_str(args.first())?.to_string();
381                if let Err(e) = self.ensure_fs_walk_path(&path) {
382                    return Ok(err(Value::Str(e)));
383                }
384                Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
385            }
386            "stat" => {
387                let path = expect_str(args.first())?.to_string();
388                if let Err(e) = self.ensure_fs_walk_path(&path) {
389                    return Ok(err(Value::Str(e)));
390                }
391                match std::fs::metadata(&path) {
392                    Ok(md) => {
393                        let mtime = md.modified()
394                            .ok()
395                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
396                            .map(|d| d.as_secs() as i64)
397                            .unwrap_or(0);
398                        let mut rec = indexmap::IndexMap::new();
399                        rec.insert("size".into(), Value::Int(md.len() as i64));
400                        rec.insert("mtime".into(), Value::Int(mtime));
401                        rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
402                        rec.insert("is_file".into(), Value::Bool(md.is_file()));
403                        Ok(ok(Value::Record(rec)))
404                    }
405                    Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
406                }
407            }
408            "list_dir" => {
409                let path = expect_str(args.first())?.to_string();
410                if let Err(e) = self.ensure_fs_walk_path(&path) {
411                    return Ok(err(Value::Str(e)));
412                }
413                match std::fs::read_dir(&path) {
414                    Ok(rd) => {
415                        let mut entries: Vec<Value> = Vec::new();
416                        for ent in rd {
417                            match ent {
418                                Ok(e) => {
419                                    let p = e.path();
420                                    entries.push(Value::Str(p.to_string_lossy().into_owned()));
421                                }
422                                Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
423                            }
424                        }
425                        Ok(ok(Value::List(entries)))
426                    }
427                    Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
428                }
429            }
430            "walk" => {
431                let path = expect_str(args.first())?.to_string();
432                if let Err(e) = self.ensure_fs_walk_path(&path) {
433                    return Ok(err(Value::Str(e)));
434                }
435                let mut paths: Vec<Value> = Vec::new();
436                for ent in walkdir::WalkDir::new(&path) {
437                    match ent {
438                        Ok(e) => paths.push(Value::Str(
439                            e.path().to_string_lossy().into_owned())),
440                        Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
441                    }
442                }
443                Ok(ok(Value::List(paths)))
444            }
445            "glob" => {
446                let pattern = expect_str(args.first())?.to_string();
447                // Glob patterns can't be path-scoped at parse time
448                // (`**/*.rs` doesn't pin a directory); we filter the
449                // per-result paths after expansion against
450                // `--allow-fs-read`.
451                let entries = match glob::glob(&pattern) {
452                    Ok(e) => e,
453                    Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
454                };
455                let mut paths: Vec<Value> = Vec::new();
456                for ent in entries {
457                    match ent {
458                        Ok(p) => {
459                            let s = p.to_string_lossy().into_owned();
460                            if self.policy.allow_fs_read.is_empty()
461                                || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
462                            {
463                                paths.push(Value::Str(s));
464                            }
465                        }
466                        Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
467                    }
468                }
469                Ok(ok(Value::List(paths)))
470            }
471            "mkdir_p" => {
472                let path = expect_str(args.first())?.to_string();
473                if let Err(e) = self.ensure_fs_write_path(&path) {
474                    return Ok(err(Value::Str(e)));
475                }
476                match std::fs::create_dir_all(&path) {
477                    Ok(_) => Ok(ok(Value::Unit)),
478                    Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
479                }
480            }
481            "remove" => {
482                let path = expect_str(args.first())?.to_string();
483                if let Err(e) = self.ensure_fs_write_path(&path) {
484                    return Ok(err(Value::Str(e)));
485                }
486                let p = std::path::Path::new(&path);
487                let result = if p.is_dir() {
488                    std::fs::remove_dir_all(p)
489                } else {
490                    std::fs::remove_file(p)
491                };
492                match result {
493                    Ok(_) => Ok(ok(Value::Unit)),
494                    Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
495                }
496            }
497            "copy" => {
498                let src = expect_str(args.first())?.to_string();
499                let dst = expect_str(args.get(1))?.to_string();
500                if let Err(e) = self.ensure_fs_walk_path(&src) {
501                    return Ok(err(Value::Str(e)));
502                }
503                if let Err(e) = self.ensure_fs_write_path(&dst) {
504                    return Ok(err(Value::Str(e)));
505                }
506                match std::fs::copy(&src, &dst) {
507                    Ok(_) => Ok(ok(Value::Unit)),
508                    Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
509                }
510            }
511            other => Err(format!("unsupported fs.{other}")),
512        }
513    }
514
515    /// Path scope for walk-style operations. `[fs_walk]` reuses the
516    /// `--allow-fs-read` allowlist — listing a directory is an
517    /// information disclosure on the same path tree as reading file
518    /// content, so the same scope applies. Empty allowlist = any path.
519    fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
520        if self.policy.allow_fs_read.is_empty() {
521            return Ok(());
522        }
523        let p = std::path::Path::new(path);
524        if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
525            Ok(())
526        } else {
527            Err(format!("fs path `{path}` outside --allow-fs-read"))
528        }
529    }
530
531    /// Path scope for mutating operations. `[fs_write]` uses the
532    /// existing `--allow-fs-write` allowlist.
533    fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
534        if self.policy.allow_fs_write.is_empty() {
535            return Ok(());
536        }
537        let p = std::path::Path::new(path);
538        if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
539            Ok(())
540        } else {
541            Err(format!("fs path `{path}` outside --allow-fs-write"))
542        }
543    }
544
545    /// Enforce `--allow-net-host` against an outgoing URL. Empty
546    /// allowlist = any host. Non-empty = the URL's host must match
547    /// (substring; port-agnostic) at least one entry.
548    fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
549        if self.policy.allow_net_host.is_empty() { return Ok(()); }
550        let host = extract_host(url).unwrap_or("");
551        if self.policy.allow_net_host.iter().any(|h| host == h) {
552            Ok(())
553        } else {
554            Err(format!(
555                "net call to host `{host}` not in --allow-net-host {:?}",
556                self.policy.allow_net_host,
557            ))
558        }
559    }
560}
561
562fn extract_host(url: &str) -> Option<&str> {
563    let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
564    let host_port = match rest.find('/') {
565        Some(i) => &rest[..i],
566        None => rest,
567    };
568    Some(match host_port.rsplit_once(':') {
569        Some((h, _)) => h,
570        None => host_port,
571    })
572}
573
574impl EffectHandler for DefaultHandler {
575    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
576        // Pure stdlib builtins (str, list, json, ...) bypass the policy
577        // gate — they have no observable side effects and aren't tracked
578        // by the type system as effects.
579        if let Some(r) = try_pure_builtin(kind, op, &args) {
580            return r;
581        }
582        // `std.fs` ops use the fine-grained `[fs_walk]` and `[fs_write]`
583        // effect kinds (distinct from the module name `fs`); the
584        // policy check uses the per-op kind, not the module's.
585        if kind == "process" {
586            self.ensure_kind_allowed("proc")?;
587            return self.dispatch_process(op, args);
588        }
589        if kind == "log" {
590            // Emit ops are [log]; config ops are [io] (set_sink also
591            // [fs_write]). The dispatch picks the right kind per op.
592            let effect_kind = match op {
593                "debug" | "info" | "warn" | "error" => "log",
594                "set_level" | "set_format" => "io",
595                "set_sink" => {
596                    self.ensure_kind_allowed("io")?;
597                    self.ensure_kind_allowed("fs_write")?;
598                    return self.dispatch_log(op, args);
599                }
600                other => return Err(format!("unsupported log.{other}")),
601            };
602            self.ensure_kind_allowed(effect_kind)?;
603            return self.dispatch_log(op, args);
604        }
605        if kind == "fs" {
606            let effect_kind = match op {
607                "exists" | "is_file" | "is_dir" | "stat"
608                | "list_dir" | "walk" | "glob" => "fs_walk",
609                "mkdir_p" | "remove" => "fs_write",
610                "copy" => {
611                    self.ensure_kind_allowed("fs_walk")?;
612                    self.ensure_kind_allowed("fs_write")?;
613                    return self.dispatch_fs(op, args);
614                }
615                other => return Err(format!("unsupported fs.{other}")),
616            };
617            self.ensure_kind_allowed(effect_kind)?;
618            return self.dispatch_fs(op, args);
619        }
620        // `crypto.random` is the lone effectful op in `std.crypto`. Its
621        // declared effect kind is `random` (fine-grained on purpose so
622        // `lex audit --effect random` flags every token-generating
623        // call), distinct from the `crypto` module name.
624        // datetime.now is the only effectful op in std.datetime;
625        // declared kind is `time`, matching the existing `time.now`.
626        if kind == "datetime" && op == "now" {
627            self.ensure_kind_allowed("time")?;
628            let now = chrono::Utc::now();
629            let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
630            return Ok(Value::Int(nanos));
631        }
632        if kind == "crypto" && op == "random" {
633            self.ensure_kind_allowed("random")?;
634            let n = expect_int(args.first())?;
635            if !(0..=1_048_576).contains(&n) {
636                return Err("crypto.random: n must be in 0..=1048576".into());
637            }
638            use rand::{rngs::OsRng, TryRngCore};
639            let mut buf = vec![0u8; n as usize];
640            OsRng.try_fill_bytes(&mut buf)
641                .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
642            return Ok(Value::Bytes(buf));
643        }
644        // `std.http` wire ops (send/get/post) gate on the `net`
645        // effect kind, not the module name. This matches the
646        // declared signature (`http.get :: Str -> [net] ...`) and
647        // keeps `--allow-effects net` doing the obvious thing for
648        // both `net.*` and `http.*` callers.
649        // `std.agent` (#184): the four runtime effects added for
650        // agent-style programs (`llm_local`, `llm_cloud`, `a2a`,
651        // `mcp`). The handlers are stubs — they enforce the
652        // declared-effect gate, return a sentinel `Ok` so traces
653        // record the call, and defer the real wire formats to
654        // downstream crates (`soft-agent` for `llm_*` and `a2a`)
655        // and #185 (MCP client wrapper).
656        if kind == "agent" {
657            let effect_kind = match op {
658                "local_complete" => "llm_local",
659                "cloud_complete" => "llm_cloud",
660                "send_a2a"       => "a2a",
661                "call_mcp"       => "mcp",
662                other => return Err(format!("unsupported agent.{other}")),
663            };
664            self.ensure_kind_allowed(effect_kind)?;
665            // `call_mcp` is wired to a real stdio MCP client
666            // (#185). The other three effects keep their stub
667            // pending the downstream crates that own their wire
668            // format (`soft-agent` for `llm_*` and `a2a`).
669            if op == "call_mcp" {
670                return Ok(dispatch_call_mcp(args));
671            }
672            return Ok(ok(Value::Str(format!("<{effect_kind} stub>"))));
673        }
674        if kind == "http" && matches!(op, "send" | "get" | "post") {
675            self.ensure_kind_allowed("net")?;
676            return match op {
677                "send" => {
678                    let req = expect_record(args.first())?;
679                    Ok(http_send_record(self, req))
680                }
681                "get" => {
682                    let url = expect_str(args.first())?.to_string();
683                    self.ensure_host_allowed(&url)?;
684                    Ok(http_send_simple("GET", &url, None, "", None))
685                }
686                "post" => {
687                    let url = expect_str(args.first())?.to_string();
688                    let body = expect_bytes(args.get(1))?.clone();
689                    let content_type = expect_str(args.get(2))?.to_string();
690                    self.ensure_host_allowed(&url)?;
691                    Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
692                }
693                _ => unreachable!(),
694            };
695        }
696        self.ensure_kind_allowed(kind)?;
697        match (kind, op) {
698            ("io", "print") => {
699                let line = expect_str(args.first())?;
700                self.sink.print_line(line);
701                Ok(Value::Unit)
702            }
703            ("io", "read") => {
704                let path = expect_str(args.first())?.to_string();
705                let resolved = self.resolve_read_path(&path);
706                // Honor read-allowlist if any. Symmetric with io.write.
707                // The path argument is checked as-given (resolved-against-
708                // read_root for tests); a tool granted [io] cannot escape
709                // the configured prefix even though the effect itself is
710                // permitted. This is the per-path scope the bench's case
711                // #6 ("[io] granted, body reads /etc/passwd") needed.
712                if !self.policy.allow_fs_read.is_empty()
713                    && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
714                {
715                    return Err(format!("read of `{path}` outside --allow-fs-read"));
716                }
717                match std::fs::read_to_string(&resolved) {
718                    Ok(s) => Ok(ok(Value::Str(s))),
719                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
720                }
721            }
722            ("io", "write") => {
723                let path = expect_str(args.first())?.to_string();
724                let contents = expect_str(args.get(1))?.to_string();
725                // Honor write-allowlist if any.
726                if !self.policy.allow_fs_write.is_empty() {
727                    let p = std::path::Path::new(&path);
728                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
729                        return Err(format!("write to `{path}` outside --allow-fs-write"));
730                    }
731                }
732                match std::fs::write(&path, contents) {
733                    Ok(_) => Ok(ok(Value::Unit)),
734                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
735                }
736            }
737            ("time", "now") => {
738                let secs = SystemTime::now().duration_since(UNIX_EPOCH)
739                    .map_err(|e| format!("time: {e}"))?.as_secs();
740                Ok(Value::Int(secs as i64))
741            }
742            ("rand", "int_in") => {
743                // Deterministic stub: midpoint of [lo, hi].
744                let lo = expect_int(args.first())?;
745                let hi = expect_int(args.get(1))?;
746                Ok(Value::Int((lo + hi) / 2))
747            }
748            ("budget", _) => {
749                // Budget calls are nominally tracked here; budget itself is
750                // enforced statically in `policy::check_program`.
751                Ok(Value::Unit)
752            }
753            ("net", "get") => {
754                let url = expect_str(args.first())?.to_string();
755                self.ensure_host_allowed(&url)?;
756                Ok(http_request("GET", &url, None))
757            }
758            ("net", "post") => {
759                let url = expect_str(args.first())?.to_string();
760                let body = expect_str(args.get(1))?.to_string();
761                self.ensure_host_allowed(&url)?;
762                Ok(http_request("POST", &url, Some(&body)))
763            }
764            ("net", "serve") => {
765                let port = match args.first() {
766                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
767                    _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
768                };
769                let handler_name = expect_str(args.get(1))?.to_string();
770                let program = self.program.clone()
771                    .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
772                let policy = self.policy.clone();
773                serve_http(port, handler_name, program, policy, None)
774            }
775            ("net", "serve_tls") => {
776                let port = match args.first() {
777                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
778                    _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
779                };
780                let cert_path = expect_str(args.get(1))?.to_string();
781                let key_path = expect_str(args.get(2))?.to_string();
782                let handler_name = expect_str(args.get(3))?.to_string();
783                let program = self.program.clone()
784                    .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
785                let policy = self.policy.clone();
786                let cert = std::fs::read(&cert_path)
787                    .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
788                let key = std::fs::read(&key_path)
789                    .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
790                serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
791            }
792            ("net", "serve_ws") => {
793                let port = match args.first() {
794                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
795                    _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
796                };
797                let handler_name = expect_str(args.get(1))?.to_string();
798                let program = self.program.clone()
799                    .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
800                let policy = self.policy.clone();
801                let registry = Arc::new(crate::ws::ChatRegistry::default());
802                crate::ws::serve_ws(port, handler_name, program, policy, registry)
803            }
804            ("chat", "broadcast") => {
805                let registry = self.chat_registry.as_ref()
806                    .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
807                let room = expect_str(args.first())?;
808                let body = expect_str(args.get(1))?;
809                crate::ws::chat_broadcast(registry, room, body);
810                Ok(Value::Unit)
811            }
812            ("chat", "send") => {
813                let registry = self.chat_registry.as_ref()
814                    .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
815                let conn_id = match args.first() {
816                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
817                    _ => return Err("chat.send: conn_id must be non-negative Int".into()),
818                };
819                let body = expect_str(args.get(1))?;
820                Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
821            }
822            ("kv", "open") => {
823                let path = expect_str(args.first())?.to_string();
824                // Honor write-allowlist: opening a Kv writes its
825                // backing files at `path`, so the same scoping that
826                // applies to `io.write` applies here.
827                if !self.policy.allow_fs_write.is_empty() {
828                    let p = std::path::Path::new(&path);
829                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
830                        return Ok(err(Value::Str(format!(
831                            "kv.open: `{path}` outside --allow-fs-write"))));
832                    }
833                }
834                match sled::open(&path) {
835                    Ok(db) => {
836                        let handle = next_kv_handle();
837                        kv_registry().lock().unwrap().insert(handle, db);
838                        Ok(ok(Value::Int(handle as i64)))
839                    }
840                    Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
841                }
842            }
843            ("kv", "close") => {
844                let h = expect_kv_handle(args.first())?;
845                kv_registry().lock().unwrap().remove(h);
846                Ok(Value::Unit)
847            }
848            ("kv", "get") => {
849                let h = expect_kv_handle(args.first())?;
850                let key = expect_str(args.get(1))?;
851                let mut reg = kv_registry().lock().unwrap();
852                let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
853                match db.get(key.as_bytes()) {
854                    Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
855                    Ok(None) => Ok(none()),
856                    Err(e) => Err(format!("kv.get: {e}")),
857                }
858            }
859            ("kv", "put") => {
860                let h = expect_kv_handle(args.first())?;
861                let key = expect_str(args.get(1))?.to_string();
862                let val = expect_bytes(args.get(2))?.clone();
863                let mut reg = kv_registry().lock().unwrap();
864                let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
865                match db.insert(key.as_bytes(), val) {
866                    Ok(_) => Ok(ok(Value::Unit)),
867                    Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
868                }
869            }
870            ("kv", "delete") => {
871                let h = expect_kv_handle(args.first())?;
872                let key = expect_str(args.get(1))?;
873                let mut reg = kv_registry().lock().unwrap();
874                let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
875                match db.remove(key.as_bytes()) {
876                    Ok(_) => Ok(ok(Value::Unit)),
877                    Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
878                }
879            }
880            ("kv", "contains") => {
881                let h = expect_kv_handle(args.first())?;
882                let key = expect_str(args.get(1))?;
883                let mut reg = kv_registry().lock().unwrap();
884                let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
885                match db.contains_key(key.as_bytes()) {
886                    Ok(present) => Ok(Value::Bool(present)),
887                    Err(e) => Err(format!("kv.contains: {e}")),
888                }
889            }
890            ("kv", "list_prefix") => {
891                let h = expect_kv_handle(args.first())?;
892                let prefix = expect_str(args.get(1))?;
893                let mut reg = kv_registry().lock().unwrap();
894                let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
895                let mut keys: Vec<Value> = Vec::new();
896                for kv in db.scan_prefix(prefix.as_bytes()) {
897                    let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
898                    let s = String::from_utf8_lossy(&k).to_string();
899                    keys.push(Value::Str(s));
900                }
901                Ok(Value::List(keys))
902            }
903            ("sql", "open") => {
904                let path = expect_str(args.first())?.to_string();
905                // Same shape as `kv.open`: opening creates the
906                // SQLite file, so the fs-write allowlist applies
907                // (in-memory paths are exempt).
908                if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
909                    let p = std::path::Path::new(&path);
910                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
911                        return Ok(err(Value::Str(format!(
912                            "sql.open: `{path}` outside --allow-fs-write"))));
913                    }
914                }
915                match rusqlite::Connection::open(&path) {
916                    Ok(conn) => {
917                        let handle = next_sql_handle();
918                        sql_registry().lock().unwrap().insert(handle, conn);
919                        Ok(ok(Value::Int(handle as i64)))
920                    }
921                    Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
922                }
923            }
924            ("sql", "close") => {
925                let h = expect_sql_handle(args.first())?;
926                sql_registry().lock().unwrap().remove(h);
927                Ok(Value::Unit)
928            }
929            ("sql", "exec") => {
930                let h = expect_sql_handle(args.first())?;
931                let stmt = expect_str(args.get(1))?.to_string();
932                let params = expect_str_list(args.get(2))?;
933                let arc = sql_registry().lock().unwrap()
934                    .touch_get(h)
935                    .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
936                let conn = arc.lock().unwrap();
937                let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
938                    .map(|s| s as &dyn rusqlite::ToSql)
939                    .collect();
940                match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
941                    Ok(n)  => Ok(ok(Value::Int(n as i64))),
942                    Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
943                }
944            }
945            ("sql", "query") => {
946                let h = expect_sql_handle(args.first())?;
947                let stmt_str = expect_str(args.get(1))?.to_string();
948                let params = expect_str_list(args.get(2))?;
949                let arc = sql_registry().lock().unwrap()
950                    .touch_get(h)
951                    .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
952                let conn = arc.lock().unwrap();
953                Ok(sql_run_query(&conn, &stmt_str, &params))
954            }
955            ("proc", "spawn") => {
956                // The escape hatch effect. Spawns a child process,
957                // collects its stdout/stderr, returns a structured
958                // record. Allow-list is the binary basename: anything
959                // outside `--allow-proc` is rejected pre-spawn.
960                //
961                // What this does NOT validate (per SECURITY.md):
962                // - per-arg content (a script-like CLI invoked via
963                //   --eval=... can run anything)
964                // - environment variables (inherited from the parent)
965                // - working directory (the parent's)
966                //
967                // For untrusted input, layer with OS-level
968                // sandboxing — gVisor / nsjail / a container.
969                let cmd = expect_str(args.first())?.to_string();
970                let raw_args = match args.get(1) {
971                    Some(Value::List(items)) => items,
972                    Some(other) => return Err(format!(
973                        "proc.spawn: args must be List[Str], got {other:?}")),
974                    None => return Err("proc.spawn: missing args list".into()),
975                };
976                let str_args: Vec<String> = raw_args.iter().map(|v| match v {
977                    Value::Str(s) => Ok(s.clone()),
978                    other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
979                }).collect::<Result<Vec<_>, _>>()?;
980
981                // Allow-list check: empty list = any binary (escape
982                // hatch); non-empty = basename of cmd must match an
983                // entry exactly.
984                if !self.policy.allow_proc.is_empty() {
985                    let basename = std::path::Path::new(&cmd)
986                        .file_name()
987                        .and_then(|s| s.to_str())
988                        .unwrap_or(&cmd);
989                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
990                        return Ok(err(Value::Str(format!(
991                            "proc.spawn: `{cmd}` not in --allow-proc {:?}",
992                            self.policy.allow_proc
993                        ))));
994                    }
995                }
996
997                // Hard caps: the spec doesn't pin numbers, but
998                // unbounded argv is a DoS vector.
999                if str_args.len() > 1024 {
1000                    return Ok(err(Value::Str(
1001                        "proc.spawn: arg-count exceeds 1024".into())));
1002                }
1003                if str_args.iter().any(|a| a.len() > 65_536) {
1004                    return Ok(err(Value::Str(
1005                        "proc.spawn: per-arg length exceeds 64 KiB".into())));
1006                }
1007
1008                let output = std::process::Command::new(&cmd)
1009                    .args(&str_args)
1010                    .output();
1011                match output {
1012                    Ok(o) => {
1013                        let mut rec = indexmap::IndexMap::new();
1014                        rec.insert("stdout".into(), Value::Str(
1015                            String::from_utf8_lossy(&o.stdout).to_string()));
1016                        rec.insert("stderr".into(), Value::Str(
1017                            String::from_utf8_lossy(&o.stderr).to_string()));
1018                        rec.insert("exit_code".into(), Value::Int(
1019                            o.status.code().unwrap_or(-1) as i64));
1020                        Ok(ok(Value::Record(rec)))
1021                    }
1022                    Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1023                }
1024            }
1025            other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1026        }
1027    }
1028}
1029
1030/// Blocks the calling thread, accepts incoming HTTP requests on
1031/// `127.0.0.1:port`, and dispatches each through the named Lex
1032/// stage. Each request gets a fresh `Vm`; the program and policy
1033/// are shared.
1034///
1035/// Handler signature in Lex (by convention):
1036///   fn <name>(req :: Record { method :: Str, path :: Str, body :: Str })
1037///        -> Record { status :: Int, body :: Str }
1038/// PEM-encoded certificate + private key, both as raw bytes.
1039pub struct TlsConfig {
1040    pub cert: Vec<u8>,
1041    pub key: Vec<u8>,
1042}
1043
1044fn serve_http(
1045    port: u16,
1046    handler_name: String,
1047    program: Arc<Program>,
1048    policy: Policy,
1049    tls: Option<TlsConfig>,
1050) -> Result<Value, String> {
1051    let (server, scheme) = match tls {
1052        None => (
1053            tiny_http::Server::http(("127.0.0.1", port))
1054                .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1055            "http",
1056        ),
1057        Some(cfg) => {
1058            let ssl = tiny_http::SslConfig {
1059                certificate: cfg.cert,
1060                private_key: cfg.key,
1061            };
1062            (
1063                tiny_http::Server::https(("127.0.0.1", port), ssl)
1064                    .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1065                "https",
1066            )
1067        }
1068    };
1069    eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1070    // Thread-per-request: the main loop accepts; each request runs on
1071    // its own worker thread with its own fresh Vm. The Program is
1072    // shared via Arc; Policy and handler_name are cloned per request.
1073    // Lex's immutability means there's no shared mutable state at the
1074    // language level — workers don't race.
1075    for req in server.incoming_requests() {
1076        let program = Arc::clone(&program);
1077        let policy = policy.clone();
1078        let handler_name = handler_name.clone();
1079        std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1080    }
1081    Ok(Value::Unit)
1082}
1083
1084fn handle_request(
1085    mut req: tiny_http::Request,
1086    program: Arc<Program>,
1087    policy: Policy,
1088    handler_name: String,
1089) {
1090    let lex_req = build_request_value(&mut req);
1091    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1092    let mut vm = Vm::with_handler(&program, Box::new(handler));
1093    match vm.call(&handler_name, vec![lex_req]) {
1094        Ok(resp) => {
1095            let (status, body) = unpack_response(&resp);
1096            let response = tiny_http::Response::from_string(body).with_status_code(status);
1097            let _ = req.respond(response);
1098        }
1099        Err(e) => {
1100            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1101                .with_status_code(500);
1102            let _ = req.respond(response);
1103        }
1104    }
1105}
1106
1107fn build_request_value(req: &mut tiny_http::Request) -> Value {
1108    let method = format!("{:?}", req.method()).to_uppercase();
1109    let url = req.url().to_string();
1110    let (path, query) = match url.split_once('?') {
1111        Some((p, q)) => (p.to_string(), q.to_string()),
1112        None => (url, String::new()),
1113    };
1114    let mut body = String::new();
1115    let _ = req.as_reader().read_to_string(&mut body);
1116    let mut rec = indexmap::IndexMap::new();
1117    rec.insert("method".into(), Value::Str(method));
1118    rec.insert("path".into(), Value::Str(path));
1119    rec.insert("query".into(), Value::Str(query));
1120    rec.insert("body".into(), Value::Str(body));
1121    Value::Record(rec)
1122}
1123
1124fn unpack_response(v: &Value) -> (u16, String) {
1125    if let Value::Record(rec) = v {
1126        let status = rec.get("status").and_then(|s| match s {
1127            Value::Int(n) => Some(*n as u16),
1128            _ => None,
1129        }).unwrap_or(200);
1130        let body = rec.get("body").and_then(|b| match b {
1131            Value::Str(s) => Some(s.clone()),
1132            _ => None,
1133        }).unwrap_or_default();
1134        return (status, body);
1135    }
1136    (500, format!("handler returned non-record: {v:?}"))
1137}
1138
1139/// HTTP/1.1 client backed by `ureq` + `rustls`. Accepts both
1140/// `http://` and `https://` URLs. Returns `Result[Str, Str]` as a
1141/// Lex `Value::Variant`. The earlier hand-rolled HTTP/1.0 client
1142/// was plain-TCP only — most public APIs are HTTPS, so the demo
1143/// could fetch `example.com` but not `wttr.in` or `api.github.com`.
1144fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1145    use std::time::Duration;
1146    // ureq 3 puts 4xx/5xx behind `Error::StatusCode(code)` and consumes
1147    // the response, so the body would be lost. Disabling
1148    // `http_status_as_error` lets us check the status manually and
1149    // surface `Err("status 404: <body>")` like the old code did.
1150    let agent: ureq::Agent = ureq::Agent::config_builder()
1151        .timeout_connect(Some(Duration::from_secs(10)))
1152        .timeout_recv_body(Some(Duration::from_secs(30)))
1153        .timeout_send_body(Some(Duration::from_secs(10)))
1154        .http_status_as_error(false)
1155        .build()
1156        .into();
1157    let resp = match (method, body) {
1158        ("GET", _) => agent.get(url).call(),
1159        ("POST", Some(b)) => agent.post(url).send(b),
1160        ("POST", None) => agent.post(url).send(""),
1161        (m, _) => return err_value(format!("unsupported method: {m}")),
1162    };
1163    match resp {
1164        Ok(mut r) => {
1165            let status = r.status().as_u16();
1166            let body = r.body_mut().read_to_string().unwrap_or_default();
1167            if (200..300).contains(&status) {
1168                Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1169            } else {
1170                err_value(format!("status {status}: {body}"))
1171            }
1172        }
1173        Err(e) => err_value(format!("transport: {e}")),
1174    }
1175}
1176
1177/// Build a ureq agent for `std.http.{send,get,post}` with the given
1178/// timeout (None → use the same defaults as the legacy `net.{get,post}`
1179/// path). Separate from `http_request` so the rich `http.send` flow
1180/// can supply per-request overrides.
1181fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1182    use std::time::Duration;
1183    let mut b = ureq::Agent::config_builder()
1184        .timeout_connect(Some(Duration::from_secs(10)))
1185        .timeout_recv_body(Some(Duration::from_secs(30)))
1186        .timeout_send_body(Some(Duration::from_secs(10)))
1187        .http_status_as_error(false);
1188    if let Some(ms) = timeout_ms {
1189        let d = Duration::from_millis(ms);
1190        b = b.timeout_global(Some(d));
1191    }
1192    b.build().into()
1193}
1194
1195/// Map ureq's transport error to the structured `HttpError` variant
1196/// std.http exposes to user code. Anything not specifically a
1197/// timeout / TLS error funnels into `NetworkError`.
1198fn http_error_value(e: ureq::Error) -> Value {
1199    let (ctor, payload): (&str, Option<String>) = match &e {
1200        ureq::Error::Timeout(_) => ("TimeoutError", None),
1201        ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1202        ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1203        ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1204        _ => ("NetworkError", Some(format!("{e}"))),
1205    };
1206    let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1207    let inner = Value::Variant { name: ctor.into(), args };
1208    Value::Variant { name: "Err".into(), args: vec![inner] }
1209}
1210
1211fn http_decode_err(msg: String) -> Value {
1212    let inner = Value::Variant {
1213        name: "DecodeError".into(),
1214        args: vec![Value::Str(msg)],
1215    };
1216    Value::Variant { name: "Err".into(), args: vec![inner] }
1217}
1218
1219/// Run a request and pack the ureq response into the
1220/// `{ status, headers, body }` Lex record (or the structured
1221/// `HttpError` on failure). `headers_extra` pairs are appended to the
1222/// outgoing request after `content_type` is applied.
1223fn http_send_simple(
1224    method: &str,
1225    url: &str,
1226    body: Option<Vec<u8>>,
1227    content_type: &str,
1228    timeout_ms: Option<u64>,
1229) -> Value {
1230    http_send_full(method, url, body, content_type, &[], timeout_ms)
1231}
1232
1233fn http_send_full(
1234    method: &str,
1235    url: &str,
1236    body: Option<Vec<u8>>,
1237    content_type: &str,
1238    headers: &[(String, String)],
1239    timeout_ms: Option<u64>,
1240) -> Value {
1241    let agent = http_agent(timeout_ms);
1242    let resp = match method {
1243        "GET" => {
1244            let mut req = agent.get(url);
1245            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1246            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1247            req.call()
1248        }
1249        "POST" => {
1250            let body = body.unwrap_or_default();
1251            let mut req = agent.post(url);
1252            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1253            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1254            req.send(&body[..])
1255        }
1256        m => {
1257            // Other methods (PUT, DELETE, PATCH, ...) fall through
1258            // here in v1.5; for now surface a structured DecodeError
1259            // so the caller can match it.
1260            return http_decode_err(format!("unsupported method: {m}"));
1261        }
1262    };
1263    match resp {
1264        Ok(mut r) => {
1265            let status = r.status().as_u16() as i64;
1266            let headers_map = collect_response_headers(r.headers());
1267            let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1268                Ok(b) => b,
1269                Err(e) => return http_decode_err(format!("body read: {e}")),
1270            };
1271            let mut rec = indexmap::IndexMap::new();
1272            rec.insert("status".into(), Value::Int(status));
1273            rec.insert("headers".into(), Value::Map(headers_map));
1274            rec.insert("body".into(), Value::Bytes(body_bytes));
1275            Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1276        }
1277        Err(e) => http_error_value(e),
1278    }
1279}
1280
1281fn collect_response_headers(
1282    headers: &ureq::http::HeaderMap,
1283) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1284    let mut out = std::collections::BTreeMap::new();
1285    for (name, value) in headers.iter() {
1286        let v = value.to_str().unwrap_or("").to_string();
1287        out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1288    }
1289    out
1290}
1291
1292/// Pull the standard `HttpRequest` shape out of a `Value::Record`
1293/// and dispatch through `http_send_full`. The handler verifies
1294/// `--allow-net-host` for the URL before sending.
1295fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1296    let method = match req.get("method") {
1297        Some(Value::Str(s)) => s.clone(),
1298        _ => return http_decode_err("HttpRequest.method must be Str".into()),
1299    };
1300    let url = match req.get("url") {
1301        Some(Value::Str(s)) => s.clone(),
1302        _ => return http_decode_err("HttpRequest.url must be Str".into()),
1303    };
1304    if let Err(e) = handler.ensure_host_allowed(&url) {
1305        return http_decode_err(e);
1306    }
1307    let body = match req.get("body") {
1308        Some(Value::Variant { name, args }) if name == "None" => None,
1309        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1310            [Value::Bytes(b)] => Some(b.clone()),
1311            _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1312        },
1313        _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1314    };
1315    let timeout_ms = match req.get("timeout_ms") {
1316        Some(Value::Variant { name, .. }) if name == "None" => None,
1317        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1318            [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1319            _ => return http_decode_err(
1320                "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1321        },
1322        _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1323    };
1324    let headers: Vec<(String, String)> = match req.get("headers") {
1325        Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1326            let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1327            let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1328            Some((kk, vv))
1329        }).collect(),
1330        _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1331    };
1332    http_send_full(&method, &url, body, "", &headers, timeout_ms)
1333}
1334
1335fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1336    match v {
1337        Some(Value::Record(r)) => Ok(r),
1338        Some(other) => Err(format!("expected Record, got {other:?}")),
1339        None => Err("missing Record argument".into()),
1340    }
1341}
1342
1343fn err_value(msg: String) -> Value {
1344    Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1345}
1346
1347fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1348    match v {
1349        Some(Value::Str(s)) => Ok(s),
1350        Some(other) => Err(format!("expected Str arg, got {other:?}")),
1351        None => Err("missing argument".into()),
1352    }
1353}
1354
1355fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1356    match v {
1357        Some(Value::Int(n)) => Ok(*n),
1358        Some(other) => Err(format!("expected Int arg, got {other:?}")),
1359        None => Err("missing argument".into()),
1360    }
1361}
1362
1363fn ok(v: Value) -> Value {
1364    Value::Variant { name: "Ok".into(), args: vec![v] }
1365}
1366fn err(v: Value) -> Value {
1367    Value::Variant { name: "Err".into(), args: vec![v] }
1368}
1369
1370/// Implementation of `agent.call_mcp(server, tool, args_json)`.
1371/// Spawn the MCP server, complete `initialize`, send `tools/call`
1372/// with the supplied arguments, and return the server's `result`
1373/// JSON serialized back to a Lex `Str`. Errors at any stage come
1374/// back as `Err(reason)` so callers can `match` on them — no
1375/// `Result<_, String>` from the handler itself.
1376fn dispatch_call_mcp(args: Vec<Value>) -> Value {
1377    let server = match args.first() {
1378        Some(Value::Str(s)) => s.clone(),
1379        _ => return err(Value::Str(
1380            "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1381    };
1382    let tool = match args.get(1) {
1383        Some(Value::Str(s)) => s.clone(),
1384        _ => return err(Value::Str(
1385            "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1386    };
1387    let args_json = match args.get(2) {
1388        Some(Value::Str(s)) => s.clone(),
1389        _ => return err(Value::Str(
1390            "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1391    };
1392    let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1393        Ok(v) => v,
1394        Err(e) => return err(Value::Str(format!(
1395            "agent.call_mcp: args_json is not valid JSON: {e}"))),
1396    };
1397    let mut client = match crate::mcp_client::McpClient::spawn(&server) {
1398        Ok(c) => c,
1399        Err(e) => return err(Value::Str(e)),
1400    };
1401    match client.call_tool(&tool, parsed) {
1402        Ok(result) => ok(Value::Str(
1403            serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1404        Err(e) => err(Value::Str(e)),
1405    }
1406}
1407
1408fn some(v: Value) -> Value {
1409    Value::Variant { name: "Some".into(), args: vec![v] }
1410}
1411fn none() -> Value {
1412    Value::Variant { name: "None".into(), args: vec![] }
1413}
1414
1415fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1416    match v {
1417        Some(Value::Bytes(b)) => Ok(b),
1418        Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1419        None => Err("missing argument".into()),
1420    }
1421}
1422
1423fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1424    match v {
1425        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1426        Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1427        None => Err("missing Kv argument".into()),
1428    }
1429}
1430
1431fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1432    match v {
1433        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1434        Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1435        None => Err("missing Db argument".into()),
1436    }
1437}
1438
1439fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1440    match v {
1441        Some(Value::List(items)) => items.iter().map(|x| match x {
1442            Value::Str(s) => Ok(s.clone()),
1443            other => Err(format!("expected List[Str] element, got {other:?}")),
1444        }).collect(),
1445        Some(other) => Err(format!("expected List[Str], got {other:?}")),
1446        None => Err("missing List[Str] argument".into()),
1447    }
1448}
1449
1450/// Run a `SELECT` (or any returning statement) and pack the rows
1451/// into `Value::List(Value::Record(...))` shape — column-name keys,
1452/// SQLite-typed values mapped one-for-one to Lex value variants
1453/// (Null → Unit, Integer → Int, Real → Float, Text → Str, Blob →
1454/// Bytes). Returns the standard `Result[List[T], Str]` Lex shape.
1455fn sql_run_query(
1456    conn: &rusqlite::Connection,
1457    stmt_str: &str,
1458    params: &[String],
1459) -> Value {
1460    let mut stmt = match conn.prepare(stmt_str) {
1461        Ok(s)  => s,
1462        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1463    };
1464    let column_count = stmt.column_count();
1465    let column_names: Vec<String> = (0..column_count)
1466        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1467        .collect();
1468    let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1469        .map(|s| s as &dyn rusqlite::ToSql)
1470        .collect();
1471    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1472        Ok(r)  => r,
1473        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1474    };
1475    let mut out: Vec<Value> = Vec::new();
1476    loop {
1477        let row = match rows.next() {
1478            Ok(Some(r)) => r,
1479            Ok(None)    => break,
1480            Err(e)      => return err(Value::Str(format!("sql.query: {e}"))),
1481        };
1482        let mut rec = indexmap::IndexMap::new();
1483        for (i, name) in column_names.iter().enumerate() {
1484            let cell = match row.get_ref(i) {
1485                Ok(c)  => sql_value_ref_to_lex(c),
1486                Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1487            };
1488            rec.insert(name.clone(), cell);
1489        }
1490        out.push(Value::Record(rec));
1491    }
1492    ok(Value::List(out))
1493}
1494
1495fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1496    use rusqlite::types::ValueRef;
1497    match v {
1498        ValueRef::Null       => Value::Unit,
1499        ValueRef::Integer(n) => Value::Int(n),
1500        ValueRef::Real(f)    => Value::Float(f),
1501        ValueRef::Text(s)    => Value::Str(String::from_utf8_lossy(s).into_owned()),
1502        ValueRef::Blob(b)    => Value::Bytes(b.to_vec()),
1503    }
1504}
1505
1506// -- log state (process-wide; configurable via log.set_*) --
1507
1508#[derive(Clone, Copy, PartialEq, PartialOrd)]
1509enum LogLevel { Debug, Info, Warn, Error }
1510
1511#[derive(Clone, Copy, PartialEq)]
1512enum LogFormat { Text, Json }
1513
1514#[derive(Clone)]
1515enum LogSink {
1516    Stderr,
1517    File(std::sync::Arc<Mutex<std::fs::File>>),
1518}
1519
1520struct LogState {
1521    level: LogLevel,
1522    format: LogFormat,
1523    sink: LogSink,
1524}
1525
1526fn log_state() -> &'static Mutex<LogState> {
1527    static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1528    STATE.get_or_init(|| Mutex::new(LogState {
1529        level: LogLevel::Info,
1530        format: LogFormat::Text,
1531        sink: LogSink::Stderr,
1532    }))
1533}
1534
1535fn parse_log_level(s: &str) -> Option<LogLevel> {
1536    match s {
1537        "debug" => Some(LogLevel::Debug),
1538        "info" => Some(LogLevel::Info),
1539        "warn" => Some(LogLevel::Warn),
1540        "error" => Some(LogLevel::Error),
1541        _ => None,
1542    }
1543}
1544
1545fn level_label(l: LogLevel) -> &'static str {
1546    match l {
1547        LogLevel::Debug => "debug",
1548        LogLevel::Info => "info",
1549        LogLevel::Warn => "warn",
1550        LogLevel::Error => "error",
1551    }
1552}
1553
1554fn emit_log(level: LogLevel, msg: &str) {
1555    let state = log_state().lock().unwrap();
1556    if level < state.level {
1557        return;
1558    }
1559    let ts = chrono::Utc::now().to_rfc3339();
1560    let line = match state.format {
1561        LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1562        LogFormat::Json => {
1563            // Hand-rolled JSON to avoid pulling serde_json into the
1564            // hot path; msg gets minimal escaping (the four common
1565            // cases that break a JSON line).
1566            let escaped = msg
1567                .replace('\\', "\\\\")
1568                .replace('"',  "\\\"")
1569                .replace('\n', "\\n")
1570                .replace('\r', "\\r");
1571            format!(
1572                "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1573                level_label(level),
1574            )
1575        }
1576    };
1577    let sink = state.sink.clone();
1578    drop(state);
1579    match sink {
1580        LogSink::Stderr => {
1581            use std::io::Write;
1582            let _ = std::io::stderr().write_all(line.as_bytes());
1583        }
1584        LogSink::File(f) => {
1585            use std::io::Write;
1586            if let Ok(mut g) = f.lock() {
1587                let _ = g.write_all(line.as_bytes());
1588            }
1589        }
1590    }
1591}
1592
1593pub(crate) struct ProcessState {
1594    child: std::process::Child,
1595    stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1596    stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1597}
1598
1599/// Process-wide registry of live `process.spawn` handles. Capped at
1600/// [`MAX_PROCESS_HANDLES`] to bound long-running programs that spawn
1601/// many short-lived children: on each `spawn` past the cap, the
1602/// least-recently-used entry is dropped (which `Drop`s its
1603/// `ProcessState`, leaving the child orphaned but the registry
1604/// bounded). `process.wait` also drops the entry on completion since
1605/// the handle becomes terminal once the child exits.
1606///
1607/// Each entry is wrapped in `Arc<Mutex<ProcessState>>` so the global
1608/// lookup mutex is held only briefly during dispatch — once we have
1609/// the per-handle `Arc`, the global lock is released and the slow
1610/// op (`wait`, `read_*_line`) only contends on its own handle's
1611/// mutex. Reads on different handles no longer block each other.
1612fn process_registry() -> &'static Mutex<ProcessRegistry> {
1613    static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1614    REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1615}
1616
1617const MAX_PROCESS_HANDLES: usize = 256;
1618
1619type SharedProcessState = Arc<Mutex<ProcessState>>;
1620
1621pub(crate) struct ProcessRegistry {
1622    entries: indexmap::IndexMap<u64, SharedProcessState>,
1623    cap: usize,
1624}
1625
1626impl ProcessRegistry {
1627    pub(crate) fn with_capacity(cap: usize) -> Self {
1628        Self { entries: indexmap::IndexMap::new(), cap }
1629    }
1630
1631    /// Insert a freshly-spawned child. If at cap, evict the LRU entry
1632    /// first; the dropped `ProcessState`'s child stays alive (orphaned)
1633    /// but its file descriptors are released.
1634    pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1635        if self.entries.len() >= self.cap {
1636            self.entries.shift_remove_index(0);
1637        }
1638        self.entries.insert(handle, Arc::new(Mutex::new(state)));
1639    }
1640
1641    /// Look up a handle, marking it most-recently-used on hit. Returns
1642    /// a clone of the shared `Arc` — callers should release the global
1643    /// registry lock before locking the per-handle mutex.
1644    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1645        let idx = self.entries.get_index_of(&handle)?;
1646        self.entries.move_index(idx, self.entries.len() - 1);
1647        self.entries.get(&handle).cloned()
1648    }
1649
1650    /// Drop the registry entry. The underlying `Arc` may outlive the
1651    /// removal if another op still holds it; that's intentional — the
1652    /// in-flight op finishes against the existing `ProcessState`, and
1653    /// only fresh lookups start failing.
1654    pub(crate) fn remove(&mut self, handle: u64) {
1655        self.entries.shift_remove(&handle);
1656    }
1657
1658    #[cfg(test)]
1659    pub(crate) fn len(&self) -> usize { self.entries.len() }
1660}
1661
1662fn next_process_handle() -> u64 {
1663    static COUNTER: AtomicU64 = AtomicU64::new(1);
1664    COUNTER.fetch_add(1, Ordering::SeqCst)
1665}
1666
1667#[cfg(all(test, unix))]
1668mod process_registry_tests {
1669    use super::{ProcessRegistry, ProcessState};
1670
1671    /// Spawn a trivial short-lived child for use as registry payload.
1672    /// `true` exits immediately — we don't actually run the child for
1673    /// real, we just need a valid `std::process::Child`.
1674    fn fresh_state() -> ProcessState {
1675        let child = std::process::Command::new("true")
1676            .stdout(std::process::Stdio::null())
1677            .stderr(std::process::Stdio::null())
1678            .spawn()
1679            .expect("spawn `true`");
1680        ProcessState { child, stdout: None, stderr: None }
1681    }
1682
1683    #[test]
1684    fn insert_and_get_round_trip() {
1685        let mut r = ProcessRegistry::with_capacity(4);
1686        r.insert(1, fresh_state());
1687        assert!(r.touch_get(1).is_some());
1688        assert!(r.touch_get(2).is_none());
1689    }
1690
1691    #[test]
1692    fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1693        let mut r = ProcessRegistry::with_capacity(4);
1694        r.insert(1, fresh_state());
1695        r.insert(2, fresh_state());
1696        let a = r.touch_get(1).unwrap();
1697        let b = r.touch_get(2).unwrap();
1698        // Different Arcs — pointer-equality check.
1699        assert!(!std::sync::Arc::ptr_eq(&a, &b));
1700    }
1701
1702    #[test]
1703    fn cap_evicts_lru_on_overflow() {
1704        let mut r = ProcessRegistry::with_capacity(2);
1705        r.insert(1, fresh_state());
1706        r.insert(2, fresh_state());
1707        let _ = r.touch_get(1);
1708        r.insert(3, fresh_state());
1709        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1710        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1711        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1712        assert_eq!(r.len(), 2);
1713    }
1714
1715    #[test]
1716    fn cap_with_no_touches_evicts_in_insertion_order() {
1717        let mut r = ProcessRegistry::with_capacity(2);
1718        r.insert(10, fresh_state());
1719        r.insert(20, fresh_state());
1720        r.insert(30, fresh_state());
1721        assert!(r.touch_get(10).is_none());
1722        assert!(r.touch_get(20).is_some());
1723        assert!(r.touch_get(30).is_some());
1724    }
1725
1726    #[test]
1727    fn remove_drops_entry() {
1728        let mut r = ProcessRegistry::with_capacity(4);
1729        r.insert(1, fresh_state());
1730        r.remove(1);
1731        assert!(r.touch_get(1).is_none());
1732        assert_eq!(r.len(), 0);
1733    }
1734
1735    #[test]
1736    fn many_inserts_stay_bounded_at_cap() {
1737        let cap = 8;
1738        let mut r = ProcessRegistry::with_capacity(cap);
1739        for i in 0..(cap as u64 * 3) {
1740            r.insert(i, fresh_state());
1741            assert!(r.len() <= cap);
1742        }
1743        assert_eq!(r.len(), cap);
1744    }
1745
1746    #[test]
1747    fn outstanding_arc_outlives_remove() {
1748        // Holding the per-handle Arc while another op removes the
1749        // entry must not invalidate the in-flight op. Mirrors the
1750        // wait-completes-then-removes pattern.
1751        let mut r = ProcessRegistry::with_capacity(4);
1752        r.insert(1, fresh_state());
1753        let arc = r.touch_get(1).expect("entry exists");
1754        r.remove(1);
1755        // Registry forgot about it, but the Arc still works.
1756        assert!(r.touch_get(1).is_none());
1757        let _state = arc.lock().unwrap();
1758    }
1759}
1760
1761fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
1762    match v {
1763        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1764        Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
1765        None => Err("missing ProcessHandle argument".into()),
1766    }
1767}
1768
1769/// Process-wide registry of open `Kv` handles. Each `kv.open` allocates
1770/// a new u64 handle via [`next_kv_handle`] and stores the `sled::Db`
1771/// here; subsequent ops fetch by handle. `kv.close` removes the entry.
1772///
1773/// Capped at [`MAX_KV_HANDLES`] to prevent leaks from long-running
1774/// programs that open many short-lived stores without calling
1775/// `kv.close`. On insert at cap, the least-recently-used entry is
1776/// dropped (closing its `sled::Db`); subsequent ops on the evicted
1777/// handle return the standard "closed or unknown Kv handle" error.
1778/// Any access (`get`, `put`, `delete`, `contains`, `list_prefix`)
1779/// touches the LRU order.
1780fn kv_registry() -> &'static Mutex<KvRegistry> {
1781    static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
1782    REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
1783}
1784
1785/// Maximum number of `kv.open` handles kept alive at once. Past this
1786/// cap, the least-recently-used handle is evicted on each new open.
1787/// Sized so that pathological "open and forget" programs are bounded
1788/// without breaking real-world programs that intentionally keep one or
1789/// two long-lived stores open.
1790const MAX_KV_HANDLES: usize = 256;
1791
1792/// LRU-bounded set of open `sled::Db` instances keyed by `u64` handle.
1793/// Built on `IndexMap` for O(1) insert / remove / lookup with
1794/// insertion-order traversal — touching an entry just shift-moves it
1795/// to the back, evictions pop from the front.
1796pub(crate) struct KvRegistry {
1797    entries: indexmap::IndexMap<u64, sled::Db>,
1798    cap: usize,
1799}
1800
1801impl KvRegistry {
1802    pub(crate) fn with_capacity(cap: usize) -> Self {
1803        Self { entries: indexmap::IndexMap::new(), cap }
1804    }
1805
1806    /// Insert a freshly-opened db. If we're already at cap, evict the
1807    /// LRU entry first; the dropped `sled::Db` closes its files.
1808    pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
1809        if self.entries.len() >= self.cap {
1810            self.entries.shift_remove_index(0);
1811        }
1812        self.entries.insert(handle, db);
1813    }
1814
1815    /// Look up a handle, marking it most-recently-used on hit.
1816    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
1817        let idx = self.entries.get_index_of(&handle)?;
1818        self.entries.move_index(idx, self.entries.len() - 1);
1819        self.entries.get(&handle)
1820    }
1821
1822    /// Explicit `kv.close`: drop the handle if present.
1823    pub(crate) fn remove(&mut self, handle: u64) {
1824        self.entries.shift_remove(&handle);
1825    }
1826
1827    #[cfg(test)]
1828    pub(crate) fn len(&self) -> usize { self.entries.len() }
1829}
1830
1831fn next_kv_handle() -> u64 {
1832    static COUNTER: AtomicU64 = AtomicU64::new(1);
1833    COUNTER.fetch_add(1, Ordering::SeqCst)
1834}
1835
1836/// Process-wide registry of open `Db` handles. Same shape as the kv
1837/// and process registries: per-handle `Arc<Mutex<…>>` so dispatch
1838/// only briefly holds the global lock and ops on different
1839/// connections don't serialize. LRU-bounded at
1840/// [`MAX_SQL_HANDLES`] to avoid leaks from long-running programs
1841/// that open many short-lived databases.
1842fn sql_registry() -> &'static Mutex<SqlRegistry> {
1843    static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
1844    REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
1845}
1846
1847const MAX_SQL_HANDLES: usize = 256;
1848
1849type SharedConn = Arc<Mutex<rusqlite::Connection>>;
1850
1851pub(crate) struct SqlRegistry {
1852    entries: indexmap::IndexMap<u64, SharedConn>,
1853    cap: usize,
1854}
1855
1856impl SqlRegistry {
1857    pub(crate) fn with_capacity(cap: usize) -> Self {
1858        Self { entries: indexmap::IndexMap::new(), cap }
1859    }
1860
1861    pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
1862        if self.entries.len() >= self.cap {
1863            self.entries.shift_remove_index(0);
1864        }
1865        self.entries.insert(handle, Arc::new(Mutex::new(conn)));
1866    }
1867
1868    /// Look up a handle, marking it MRU on hit. Returns a clone of
1869    /// the shared `Arc` so callers release the global registry
1870    /// lock before locking the per-handle mutex.
1871    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
1872        let idx = self.entries.get_index_of(&handle)?;
1873        self.entries.move_index(idx, self.entries.len() - 1);
1874        self.entries.get(&handle).cloned()
1875    }
1876
1877    pub(crate) fn remove(&mut self, handle: u64) {
1878        self.entries.shift_remove(&handle);
1879    }
1880
1881    #[cfg(test)]
1882    pub(crate) fn len(&self) -> usize { self.entries.len() }
1883}
1884
1885fn next_sql_handle() -> u64 {
1886    static COUNTER: AtomicU64 = AtomicU64::new(1);
1887    COUNTER.fetch_add(1, Ordering::SeqCst)
1888}
1889
1890#[cfg(test)]
1891mod sql_registry_tests {
1892    use super::SqlRegistry;
1893
1894    fn fresh() -> rusqlite::Connection {
1895        rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
1896    }
1897
1898    #[test]
1899    fn insert_and_get_round_trip() {
1900        let mut r = SqlRegistry::with_capacity(4);
1901        r.insert(1, fresh());
1902        assert!(r.touch_get(1).is_some());
1903        assert!(r.touch_get(2).is_none());
1904    }
1905
1906    #[test]
1907    fn cap_evicts_lru_on_overflow() {
1908        let mut r = SqlRegistry::with_capacity(2);
1909        r.insert(1, fresh());
1910        r.insert(2, fresh());
1911        let _ = r.touch_get(1);
1912        r.insert(3, fresh());
1913        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1914        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1915        assert!(r.touch_get(3).is_some(), "3 just inserted");
1916        assert_eq!(r.len(), 2);
1917    }
1918
1919    #[test]
1920    fn remove_drops_entry() {
1921        let mut r = SqlRegistry::with_capacity(4);
1922        r.insert(1, fresh());
1923        r.remove(1);
1924        assert!(r.touch_get(1).is_none());
1925        assert_eq!(r.len(), 0);
1926    }
1927
1928    #[test]
1929    fn many_inserts_stay_bounded_at_cap() {
1930        let cap = 8;
1931        let mut r = SqlRegistry::with_capacity(cap);
1932        for i in 0..(cap as u64 * 3) {
1933            r.insert(i, fresh());
1934            assert!(r.len() <= cap);
1935        }
1936        assert_eq!(r.len(), cap);
1937    }
1938}
1939
1940#[cfg(test)]
1941mod kv_registry_tests {
1942    use super::KvRegistry;
1943
1944    /// Spin up an isolated `sled::Db` in a temp dir. Each call gets a
1945    /// unique path so concurrent tests don't collide on the lockfile.
1946    fn fresh_db(tag: &str) -> sled::Db {
1947        let dir = std::env::temp_dir().join(format!(
1948            "lex-kv-reg-{}-{}-{}",
1949            std::process::id(),
1950            tag,
1951            std::time::SystemTime::now()
1952                .duration_since(std::time::UNIX_EPOCH)
1953                .unwrap()
1954                .as_nanos()
1955        ));
1956        sled::open(&dir).expect("sled open")
1957    }
1958
1959    #[test]
1960    fn insert_and_get_round_trip() {
1961        let mut r = KvRegistry::with_capacity(4);
1962        r.insert(1, fresh_db("a"));
1963        assert!(r.touch_get(1).is_some());
1964        assert!(r.touch_get(2).is_none());
1965    }
1966
1967    #[test]
1968    fn cap_evicts_lru_on_overflow() {
1969        // cap=2: insert 1, 2; touch 1 (now MRU); insert 3 → 2 evicted.
1970        let mut r = KvRegistry::with_capacity(2);
1971        r.insert(1, fresh_db("c1"));
1972        r.insert(2, fresh_db("c2"));
1973        let _ = r.touch_get(1);
1974        r.insert(3, fresh_db("c3"));
1975        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1976        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1977        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1978        assert_eq!(r.len(), 2);
1979    }
1980
1981    #[test]
1982    fn cap_with_no_touches_evicts_in_insertion_order() {
1983        // cap=2: insert 1, 2, 3 with no touches → 1 evicted (FIFO).
1984        let mut r = KvRegistry::with_capacity(2);
1985        r.insert(10, fresh_db("f1"));
1986        r.insert(20, fresh_db("f2"));
1987        r.insert(30, fresh_db("f3"));
1988        assert!(r.touch_get(10).is_none());
1989        assert!(r.touch_get(20).is_some());
1990        assert!(r.touch_get(30).is_some());
1991    }
1992
1993    #[test]
1994    fn remove_drops_entry() {
1995        let mut r = KvRegistry::with_capacity(4);
1996        r.insert(1, fresh_db("r1"));
1997        r.remove(1);
1998        assert!(r.touch_get(1).is_none());
1999        assert_eq!(r.len(), 0);
2000    }
2001
2002    #[test]
2003    fn remove_unknown_handle_is_noop() {
2004        let mut r = KvRegistry::with_capacity(4);
2005        r.insert(1, fresh_db("u1"));
2006        r.remove(999);
2007        assert!(r.touch_get(1).is_some());
2008    }
2009
2010    #[test]
2011    fn many_inserts_stay_bounded_at_cap() {
2012        // Exhaust the cap to confirm the registry never grows past it,
2013        // even under sustained churn.
2014        let cap = 8;
2015        let mut r = KvRegistry::with_capacity(cap);
2016        for i in 0..(cap as u64 * 3) {
2017            r.insert(i, fresh_db(&format!("b{i}")));
2018            assert!(r.len() <= cap);
2019        }
2020        assert_eq!(r.len(), cap);
2021    }
2022}