Skip to main content

lex_runtime/
handler.rs

1//! Native effect handlers, dispatched at runtime through the VM's
2//! `EffectHandler` trait. The handler also re-checks the runtime policy
3//! per spec §7.4 (the static check is necessary but not sufficient: a fn
4//! declared `[fs_read("/data")]` that's allowed at startup still has to
5//! pass the path check at the point of dispatch).
6
7use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::cell::RefCell;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Mutex, OnceLock};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::builtins::try_pure_builtin;
17use crate::policy::Policy;
18
19/// Output sink used by `io.print`. Tests inject a buffer; production prints
20/// to stdout.
21pub trait IoSink: Send {
22    fn print_line(&mut self, s: &str);
23}
24
25pub struct StdoutSink;
26impl IoSink for StdoutSink {
27    fn print_line(&mut self, s: &str) {
28        println!("{s}");
29    }
30}
31
32#[derive(Default)]
33pub struct CapturedSink { pub lines: Vec<String> }
34impl IoSink for CapturedSink {
35    fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
36}
37
38pub struct DefaultHandler {
39    policy: Policy,
40    pub sink: Box<dyn IoSink>,
41    /// Optional read root for `io.read` — when set, `io.read("p")` resolves
42    /// to `read_root.join(p)`. Lets tests run without touching the real fs.
43    pub read_root: Option<PathBuf>,
44    /// Captured budget consumption (post-static-check, observability only).
45    pub budget_used: RefCell<u64>,
46    /// Shared reference to the program, needed by `net.serve` so the
47    /// handler can spin up fresh VMs to dispatch incoming requests.
48    /// `None` if the handler was constructed without a program.
49    pub program: Option<Arc<Program>>,
50    /// Chat registry; populated by `net.serve_ws`'s per-message
51    /// dispatch so `chat.broadcast` / `chat.send` work from inside
52    /// a handler invocation.
53    pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
54    /// LRU cache of `agent.call_mcp` clients keyed by the
55    /// command-line string (#197). Avoids spawn-per-call cost
56    /// when an agent invokes the same MCP server in tight loops.
57    /// Capped — when the cache is full, the least-recently-used
58    /// entry is dropped (its subprocess is reaped on Drop).
59    pub mcp_clients: crate::mcp_client::McpClientCache,
60}
61
62impl DefaultHandler {
63    pub fn new(policy: Policy) -> Self {
64        Self {
65            policy,
66            sink: Box::new(StdoutSink),
67            read_root: None,
68            budget_used: RefCell::new(0),
69            program: None,
70            chat_registry: None,
71            mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
72        }
73    }
74
75    pub fn with_program(mut self, program: Arc<Program>) -> Self {
76        self.program = Some(program); self
77    }
78
79    pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
80        self.chat_registry = Some(registry); self
81    }
82
83    pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
84        self.sink = sink; self
85    }
86
87    pub fn with_read_root(mut self, root: PathBuf) -> Self {
88        self.read_root = Some(root); self
89    }
90
91    fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
92        if self.policy.allow_effects.contains(kind) {
93            Ok(())
94        } else {
95            Err(format!("effect `{kind}` not in --allow-effects"))
96        }
97    }
98
99    fn resolve_read_path(&self, p: &str) -> PathBuf {
100        match &self.read_root {
101            Some(root) => root.join(p.trim_start_matches('/')),
102            None => PathBuf::from(p),
103        }
104    }
105
106    fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
107        match op {
108            "debug" | "info" | "warn" | "error" => {
109                let msg = expect_str(args.first())?;
110                let level = match op {
111                    "debug" => LogLevel::Debug,
112                    "info" => LogLevel::Info,
113                    "warn" => LogLevel::Warn,
114                    _ => LogLevel::Error,
115                };
116                emit_log(level, msg);
117                Ok(Value::Unit)
118            }
119            "set_level" => {
120                let s = expect_str(args.first())?;
121                match parse_log_level(s) {
122                    Some(l) => {
123                        log_state().lock().unwrap().level = l;
124                        Ok(ok(Value::Unit))
125                    }
126                    None => Ok(err(Value::Str(format!(
127                        "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
128                }
129            }
130            "set_format" => {
131                let s = expect_str(args.first())?;
132                let fmt = match s {
133                    "text" => LogFormat::Text,
134                    "json" => LogFormat::Json,
135                    other => return Ok(err(Value::Str(format!(
136                        "log.set_format: unknown format `{other}`; expected text|json")))),
137                };
138                log_state().lock().unwrap().format = fmt;
139                Ok(ok(Value::Unit))
140            }
141            "set_sink" => {
142                let path = expect_str(args.first())?;
143                if path == "-" {
144                    log_state().lock().unwrap().sink = LogSink::Stderr;
145                    return Ok(ok(Value::Unit));
146                }
147                if let Err(e) = self.ensure_fs_write_path(path) {
148                    return Ok(err(Value::Str(e)));
149                }
150                match std::fs::OpenOptions::new()
151                    .create(true).append(true).open(path)
152                {
153                    Ok(f) => {
154                        log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
155                        Ok(ok(Value::Unit))
156                    }
157                    Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
158                }
159            }
160            other => Err(format!("unsupported log.{other}")),
161        }
162    }
163
164    fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
165        match op {
166            "spawn" => {
167                let cmd = expect_str(args.first())?.to_string();
168                let raw_args = match args.get(1) {
169                    Some(Value::List(items)) => items.clone(),
170                    _ => return Err("process.spawn: args must be List[Str]".into()),
171                };
172                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
173                    Value::Str(s) => Ok(s.clone()),
174                    other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
175                }).collect();
176                let str_args = str_args?;
177                let opts = match args.get(2) {
178                    Some(Value::Record(r)) => r.clone(),
179                    _ => return Err("process.spawn: missing or invalid opts record".into()),
180                };
181
182                // Allow-list check, mirroring the existing proc.spawn.
183                if !self.policy.allow_proc.is_empty() {
184                    let basename = std::path::Path::new(&cmd)
185                        .file_name()
186                        .and_then(|s| s.to_str())
187                        .unwrap_or(&cmd);
188                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
189                        return Ok(err(Value::Str(format!(
190                            "process.spawn: `{cmd}` not in --allow-proc {:?}",
191                            self.policy.allow_proc
192                        ))));
193                    }
194                }
195
196                let mut command = std::process::Command::new(&cmd);
197                command.args(&str_args);
198                command.stdin(std::process::Stdio::piped());
199                command.stdout(std::process::Stdio::piped());
200                command.stderr(std::process::Stdio::piped());
201
202                if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
203                    if name == "Some" {
204                        if let Some(Value::Str(s)) = vargs.first() {
205                            command.current_dir(s);
206                        }
207                    }
208                }
209                if let Some(Value::Map(env)) = opts.get("env") {
210                    for (k, v) in env {
211                        if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
212                            command.env(ks, vs);
213                        }
214                    }
215                }
216
217                let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
218                    Some(Value::Variant { name, args: vargs }) if name == "Some" => {
219                        match vargs.first() {
220                            Some(Value::Bytes(b)) => Some(b.clone()),
221                            _ => None,
222                        }
223                    }
224                    _ => None,
225                };
226
227                let mut child = match command.spawn() {
228                    Ok(c) => c,
229                    Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
230                };
231
232                if let Some(payload) = stdin_payload {
233                    if let Some(mut stdin) = child.stdin.take() {
234                        use std::io::Write;
235                        let _ = stdin.write_all(&payload);
236                        // Drop closes stdin; the child sees EOF.
237                    }
238                }
239
240                let stdout = child.stdout.take().map(std::io::BufReader::new);
241                let stderr = child.stderr.take().map(std::io::BufReader::new);
242                let handle = next_process_handle();
243                process_registry().lock().unwrap().insert(handle, ProcessState {
244                    child,
245                    stdout,
246                    stderr,
247                });
248                Ok(ok(Value::Int(handle as i64)))
249            }
250            "read_stdout_line" => Self::read_line_op(args, true),
251            "read_stderr_line" => Self::read_line_op(args, false),
252            "wait" => {
253                let h = expect_process_handle(args.first())?;
254                // Look up the per-handle Arc, then release the global
255                // lock before the (slow) wait so unrelated handles
256                // can dispatch concurrently.
257                let arc = process_registry().lock().unwrap()
258                    .touch_get(h)
259                    .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
260                let status = {
261                    let mut state = arc.lock().unwrap();
262                    state.child.wait().map_err(|e| format!("process.wait: {e}"))?
263                };
264                // Wait completion makes the handle terminal; drop it
265                // from the registry so the cap doesn't fill up with
266                // exited children.
267                process_registry().lock().unwrap().remove(h);
268                let mut rec = indexmap::IndexMap::new();
269                rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
270                #[cfg(unix)]
271                {
272                    use std::os::unix::process::ExitStatusExt;
273                    rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
274                }
275                #[cfg(not(unix))]
276                {
277                    rec.insert("signaled".into(), Value::Bool(false));
278                }
279                Ok(Value::Record(rec))
280            }
281            "kill" => {
282                let h = expect_process_handle(args.first())?;
283                let _signal = expect_str(args.get(1))?;
284                let arc = process_registry().lock().unwrap()
285                    .touch_get(h)
286                    .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
287                let mut state = arc.lock().unwrap();
288                // Cross-platform: only `kill` (SIGKILL-equivalent on
289                // Windows). Signal-name dispatch is a v1.5 follow-up.
290                match state.child.kill() {
291                    Ok(_) => Ok(ok(Value::Unit)),
292                    Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
293                }
294            }
295            "run" => {
296                let cmd = expect_str(args.first())?.to_string();
297                let raw_args = match args.get(1) {
298                    Some(Value::List(items)) => items.clone(),
299                    _ => return Err("process.run: args must be List[Str]".into()),
300                };
301                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
302                    Value::Str(s) => Ok(s.clone()),
303                    other => Err(format!("process.run: arg must be Str, got {other:?}")),
304                }).collect();
305                let str_args = str_args?;
306                if !self.policy.allow_proc.is_empty() {
307                    let basename = std::path::Path::new(&cmd)
308                        .file_name()
309                        .and_then(|s| s.to_str())
310                        .unwrap_or(&cmd);
311                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
312                        return Ok(err(Value::Str(format!(
313                            "process.run: `{cmd}` not in --allow-proc {:?}",
314                            self.policy.allow_proc
315                        ))));
316                    }
317                }
318                match std::process::Command::new(&cmd).args(&str_args).output() {
319                    Ok(o) => {
320                        let mut rec = indexmap::IndexMap::new();
321                        rec.insert("stdout".into(), Value::Str(
322                            String::from_utf8_lossy(&o.stdout).to_string()));
323                        rec.insert("stderr".into(), Value::Str(
324                            String::from_utf8_lossy(&o.stderr).to_string()));
325                        rec.insert("exit_code".into(), Value::Int(
326                            o.status.code().unwrap_or(-1) as i64));
327                        Ok(ok(Value::Record(rec)))
328                    }
329                    Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
330                }
331            }
332            other => Err(format!("unsupported process.{other}")),
333        }
334    }
335
336    /// Read one line from the child's stdout (`is_stdout = true`) or
337    /// stderr. Returns `None` (Lex `Option`) at EOF; subsequent calls
338    /// keep returning `None`. Holds only the per-handle mutex during
339    /// the (potentially blocking) read, so reads on one handle don't
340    /// block reads/waits on a different handle.
341    fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
342        let h = expect_process_handle(args.first())?;
343        let arc = process_registry().lock().unwrap()
344            .touch_get(h)
345            .ok_or_else(|| format!(
346                "process.read_{}_line: closed or unknown ProcessHandle",
347                if is_stdout { "stdout" } else { "stderr" }))?;
348        let mut state = arc.lock().unwrap();
349        let reader_opt = if is_stdout {
350            state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
351        } else {
352            state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
353        };
354        let reader = match reader_opt {
355            Some(r) => r,
356            None => return Ok(none()),
357        };
358        let mut line = String::new();
359        match reader.read_line(&mut line) {
360            Ok(0) => Ok(none()),
361            Ok(_) => {
362                if line.ends_with('\n') { line.pop(); }
363                if line.ends_with('\r') { line.pop(); }
364                Ok(some(Value::Str(line)))
365            }
366            Err(e) => Err(format!("process.read_*_line: {e}")),
367        }
368    }
369
370    fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
371        match op {
372            "exists" => {
373                let path = expect_str(args.first())?.to_string();
374                if let Err(e) = self.ensure_fs_walk_path(&path) {
375                    return Ok(err(Value::Str(e)));
376                }
377                Ok(Value::Bool(std::path::Path::new(&path).exists()))
378            }
379            "is_file" => {
380                let path = expect_str(args.first())?.to_string();
381                if let Err(e) = self.ensure_fs_walk_path(&path) {
382                    return Ok(err(Value::Str(e)));
383                }
384                Ok(Value::Bool(std::path::Path::new(&path).is_file()))
385            }
386            "is_dir" => {
387                let path = expect_str(args.first())?.to_string();
388                if let Err(e) = self.ensure_fs_walk_path(&path) {
389                    return Ok(err(Value::Str(e)));
390                }
391                Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
392            }
393            "stat" => {
394                let path = expect_str(args.first())?.to_string();
395                if let Err(e) = self.ensure_fs_walk_path(&path) {
396                    return Ok(err(Value::Str(e)));
397                }
398                match std::fs::metadata(&path) {
399                    Ok(md) => {
400                        let mtime = md.modified()
401                            .ok()
402                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
403                            .map(|d| d.as_secs() as i64)
404                            .unwrap_or(0);
405                        let mut rec = indexmap::IndexMap::new();
406                        rec.insert("size".into(), Value::Int(md.len() as i64));
407                        rec.insert("mtime".into(), Value::Int(mtime));
408                        rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
409                        rec.insert("is_file".into(), Value::Bool(md.is_file()));
410                        Ok(ok(Value::Record(rec)))
411                    }
412                    Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
413                }
414            }
415            "list_dir" => {
416                let path = expect_str(args.first())?.to_string();
417                if let Err(e) = self.ensure_fs_walk_path(&path) {
418                    return Ok(err(Value::Str(e)));
419                }
420                match std::fs::read_dir(&path) {
421                    Ok(rd) => {
422                        let mut entries: Vec<Value> = Vec::new();
423                        for ent in rd {
424                            match ent {
425                                Ok(e) => {
426                                    let p = e.path();
427                                    entries.push(Value::Str(p.to_string_lossy().into_owned()));
428                                }
429                                Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
430                            }
431                        }
432                        Ok(ok(Value::List(entries)))
433                    }
434                    Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
435                }
436            }
437            "walk" => {
438                let path = expect_str(args.first())?.to_string();
439                if let Err(e) = self.ensure_fs_walk_path(&path) {
440                    return Ok(err(Value::Str(e)));
441                }
442                let mut paths: Vec<Value> = Vec::new();
443                for ent in walkdir::WalkDir::new(&path) {
444                    match ent {
445                        Ok(e) => paths.push(Value::Str(
446                            e.path().to_string_lossy().into_owned())),
447                        Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
448                    }
449                }
450                Ok(ok(Value::List(paths)))
451            }
452            "glob" => {
453                let pattern = expect_str(args.first())?.to_string();
454                // Glob patterns can't be path-scoped at parse time
455                // (`**/*.rs` doesn't pin a directory); we filter the
456                // per-result paths after expansion against
457                // `--allow-fs-read`.
458                let entries = match glob::glob(&pattern) {
459                    Ok(e) => e,
460                    Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
461                };
462                let mut paths: Vec<Value> = Vec::new();
463                for ent in entries {
464                    match ent {
465                        Ok(p) => {
466                            let s = p.to_string_lossy().into_owned();
467                            if self.policy.allow_fs_read.is_empty()
468                                || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
469                            {
470                                paths.push(Value::Str(s));
471                            }
472                        }
473                        Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
474                    }
475                }
476                Ok(ok(Value::List(paths)))
477            }
478            "mkdir_p" => {
479                let path = expect_str(args.first())?.to_string();
480                if let Err(e) = self.ensure_fs_write_path(&path) {
481                    return Ok(err(Value::Str(e)));
482                }
483                match std::fs::create_dir_all(&path) {
484                    Ok(_) => Ok(ok(Value::Unit)),
485                    Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
486                }
487            }
488            "remove" => {
489                let path = expect_str(args.first())?.to_string();
490                if let Err(e) = self.ensure_fs_write_path(&path) {
491                    return Ok(err(Value::Str(e)));
492                }
493                let p = std::path::Path::new(&path);
494                let result = if p.is_dir() {
495                    std::fs::remove_dir_all(p)
496                } else {
497                    std::fs::remove_file(p)
498                };
499                match result {
500                    Ok(_) => Ok(ok(Value::Unit)),
501                    Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
502                }
503            }
504            "copy" => {
505                let src = expect_str(args.first())?.to_string();
506                let dst = expect_str(args.get(1))?.to_string();
507                if let Err(e) = self.ensure_fs_walk_path(&src) {
508                    return Ok(err(Value::Str(e)));
509                }
510                if let Err(e) = self.ensure_fs_write_path(&dst) {
511                    return Ok(err(Value::Str(e)));
512                }
513                match std::fs::copy(&src, &dst) {
514                    Ok(_) => Ok(ok(Value::Unit)),
515                    Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
516                }
517            }
518            other => Err(format!("unsupported fs.{other}")),
519        }
520    }
521
522    /// Path scope for walk-style operations. `[fs_walk]` reuses the
523    /// `--allow-fs-read` allowlist — listing a directory is an
524    /// information disclosure on the same path tree as reading file
525    /// content, so the same scope applies. Empty allowlist = any path.
526    fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
527        if self.policy.allow_fs_read.is_empty() {
528            return Ok(());
529        }
530        let p = std::path::Path::new(path);
531        if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
532            Ok(())
533        } else {
534            Err(format!("fs path `{path}` outside --allow-fs-read"))
535        }
536    }
537
538    /// Path scope for mutating operations. `[fs_write]` uses the
539    /// existing `--allow-fs-write` allowlist.
540    fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
541        if self.policy.allow_fs_write.is_empty() {
542            return Ok(());
543        }
544        let p = std::path::Path::new(path);
545        if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
546            Ok(())
547        } else {
548            Err(format!("fs path `{path}` outside --allow-fs-write"))
549        }
550    }
551
552    /// Enforce `--allow-net-host` against an outgoing URL. Empty
553    /// allowlist = any host. Non-empty = the URL's host must match
554    /// (substring; port-agnostic) at least one entry.
555    fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
556        if self.policy.allow_net_host.is_empty() { return Ok(()); }
557        let host = extract_host(url).unwrap_or("");
558        if self.policy.allow_net_host.iter().any(|h| host == h) {
559            Ok(())
560        } else {
561            Err(format!(
562                "net call to host `{host}` not in --allow-net-host {:?}",
563                self.policy.allow_net_host,
564            ))
565        }
566    }
567}
568
569fn extract_host(url: &str) -> Option<&str> {
570    let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
571    let host_port = match rest.find('/') {
572        Some(i) => &rest[..i],
573        None => rest,
574    };
575    Some(match host_port.rsplit_once(':') {
576        Some((h, _)) => h,
577        None => host_port,
578    })
579}
580
581impl EffectHandler for DefaultHandler {
582    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
583        // Pure stdlib builtins (str, list, json, ...) bypass the policy
584        // gate — they have no observable side effects and aren't tracked
585        // by the type system as effects.
586        if let Some(r) = try_pure_builtin(kind, op, &args) {
587            return r;
588        }
589        // `std.fs` ops use the fine-grained `[fs_walk]` and `[fs_write]`
590        // effect kinds (distinct from the module name `fs`); the
591        // policy check uses the per-op kind, not the module's.
592        if kind == "process" {
593            self.ensure_kind_allowed("proc")?;
594            return self.dispatch_process(op, args);
595        }
596        if kind == "log" {
597            // Emit ops are [log]; config ops are [io] (set_sink also
598            // [fs_write]). The dispatch picks the right kind per op.
599            let effect_kind = match op {
600                "debug" | "info" | "warn" | "error" => "log",
601                "set_level" | "set_format" => "io",
602                "set_sink" => {
603                    self.ensure_kind_allowed("io")?;
604                    self.ensure_kind_allowed("fs_write")?;
605                    return self.dispatch_log(op, args);
606                }
607                other => return Err(format!("unsupported log.{other}")),
608            };
609            self.ensure_kind_allowed(effect_kind)?;
610            return self.dispatch_log(op, args);
611        }
612        if kind == "fs" {
613            let effect_kind = match op {
614                "exists" | "is_file" | "is_dir" | "stat"
615                | "list_dir" | "walk" | "glob" => "fs_walk",
616                "mkdir_p" | "remove" => "fs_write",
617                "copy" => {
618                    self.ensure_kind_allowed("fs_walk")?;
619                    self.ensure_kind_allowed("fs_write")?;
620                    return self.dispatch_fs(op, args);
621                }
622                other => return Err(format!("unsupported fs.{other}")),
623            };
624            self.ensure_kind_allowed(effect_kind)?;
625            return self.dispatch_fs(op, args);
626        }
627        // `crypto.random` is the lone effectful op in `std.crypto`. Its
628        // declared effect kind is `random` (fine-grained on purpose so
629        // `lex audit --effect random` flags every token-generating
630        // call), distinct from the `crypto` module name.
631        // datetime.now is the only effectful op in std.datetime;
632        // declared kind is `time`, matching the existing `time.now`.
633        if kind == "datetime" && op == "now" {
634            self.ensure_kind_allowed("time")?;
635            let now = chrono::Utc::now();
636            let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
637            return Ok(Value::Int(nanos));
638        }
639        if kind == "crypto" && op == "random" {
640            self.ensure_kind_allowed("random")?;
641            let n = expect_int(args.first())?;
642            if !(0..=1_048_576).contains(&n) {
643                return Err("crypto.random: n must be in 0..=1048576".into());
644            }
645            use rand::{rngs::OsRng, TryRngCore};
646            let mut buf = vec![0u8; n as usize];
647            OsRng.try_fill_bytes(&mut buf)
648                .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
649            return Ok(Value::Bytes(buf));
650        }
651        // `std.http` wire ops (send/get/post) gate on the `net`
652        // effect kind, not the module name. This matches the
653        // declared signature (`http.get :: Str -> [net] ...`) and
654        // keeps `--allow-effects net` doing the obvious thing for
655        // both `net.*` and `http.*` callers.
656        // `std.agent` (#184): the four runtime effects added for
657        // agent-style programs (`llm_local`, `llm_cloud`, `a2a`,
658        // `mcp`). The handlers are stubs — they enforce the
659        // declared-effect gate, return a sentinel `Ok` so traces
660        // record the call, and defer the real wire formats to
661        // downstream crates (`soft-agent` for `llm_*` and `a2a`)
662        // and #185 (MCP client wrapper).
663        if kind == "agent" {
664            let effect_kind = match op {
665                "local_complete" => "llm_local",
666                "cloud_complete" => "llm_cloud",
667                "send_a2a"       => "a2a",
668                "call_mcp"       => "mcp",
669                other => return Err(format!("unsupported agent.{other}")),
670            };
671            self.ensure_kind_allowed(effect_kind)?;
672            // `call_mcp` runs through the LRU client cache
673            // (#197). `local_complete` / `cloud_complete` hit
674            // Ollama / OpenAI via env-var-driven configuration
675            // (#196); custom backends override at the
676            // EffectHandler layer rather than via a config file.
677            // `send_a2a` keeps its stub — that wire format
678            // lives in downstream `soft-a2a`.
679            return match op {
680                "call_mcp"       => Ok(self.dispatch_call_mcp(args)),
681                "local_complete" => Ok(dispatch_llm_local(args)),
682                "cloud_complete" => Ok(dispatch_llm_cloud(args)),
683                _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
684            };
685        }
686        if kind == "http" && matches!(op, "send" | "get" | "post") {
687            self.ensure_kind_allowed("net")?;
688            return match op {
689                "send" => {
690                    let req = expect_record(args.first())?;
691                    Ok(http_send_record(self, req))
692                }
693                "get" => {
694                    let url = expect_str(args.first())?.to_string();
695                    self.ensure_host_allowed(&url)?;
696                    Ok(http_send_simple("GET", &url, None, "", None))
697                }
698                "post" => {
699                    let url = expect_str(args.first())?.to_string();
700                    let body = expect_bytes(args.get(1))?.clone();
701                    let content_type = expect_str(args.get(2))?.to_string();
702                    self.ensure_host_allowed(&url)?;
703                    Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
704                }
705                _ => unreachable!(),
706            };
707        }
708        self.ensure_kind_allowed(kind)?;
709        match (kind, op) {
710            ("io", "print") => {
711                let line = expect_str(args.first())?;
712                self.sink.print_line(line);
713                Ok(Value::Unit)
714            }
715            ("io", "read") => {
716                let path = expect_str(args.first())?.to_string();
717                let resolved = self.resolve_read_path(&path);
718                // Honor read-allowlist if any. Symmetric with io.write.
719                // The path argument is checked as-given (resolved-against-
720                // read_root for tests); a tool granted [io] cannot escape
721                // the configured prefix even though the effect itself is
722                // permitted. This is the per-path scope the bench's case
723                // #6 ("[io] granted, body reads /etc/passwd") needed.
724                if !self.policy.allow_fs_read.is_empty()
725                    && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
726                {
727                    return Err(format!("read of `{path}` outside --allow-fs-read"));
728                }
729                match std::fs::read_to_string(&resolved) {
730                    Ok(s) => Ok(ok(Value::Str(s))),
731                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
732                }
733            }
734            ("io", "write") => {
735                let path = expect_str(args.first())?.to_string();
736                let contents = expect_str(args.get(1))?.to_string();
737                // Honor write-allowlist if any.
738                if !self.policy.allow_fs_write.is_empty() {
739                    let p = std::path::Path::new(&path);
740                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
741                        return Err(format!("write to `{path}` outside --allow-fs-write"));
742                    }
743                }
744                match std::fs::write(&path, contents) {
745                    Ok(_) => Ok(ok(Value::Unit)),
746                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
747                }
748            }
749            ("time", "now") => {
750                let secs = SystemTime::now().duration_since(UNIX_EPOCH)
751                    .map_err(|e| format!("time: {e}"))?.as_secs();
752                Ok(Value::Int(secs as i64))
753            }
754            ("rand", "int_in") => {
755                // Deterministic stub: midpoint of [lo, hi].
756                let lo = expect_int(args.first())?;
757                let hi = expect_int(args.get(1))?;
758                Ok(Value::Int((lo + hi) / 2))
759            }
760            ("budget", _) => {
761                // Budget calls are nominally tracked here; budget itself is
762                // enforced statically in `policy::check_program`.
763                Ok(Value::Unit)
764            }
765            ("net", "get") => {
766                let url = expect_str(args.first())?.to_string();
767                self.ensure_host_allowed(&url)?;
768                Ok(http_request("GET", &url, None))
769            }
770            ("net", "post") => {
771                let url = expect_str(args.first())?.to_string();
772                let body = expect_str(args.get(1))?.to_string();
773                self.ensure_host_allowed(&url)?;
774                Ok(http_request("POST", &url, Some(&body)))
775            }
776            ("net", "serve") => {
777                let port = match args.first() {
778                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
779                    _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
780                };
781                let handler_name = expect_str(args.get(1))?.to_string();
782                let program = self.program.clone()
783                    .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
784                let policy = self.policy.clone();
785                serve_http(port, handler_name, program, policy, None)
786            }
787            ("net", "serve_tls") => {
788                let port = match args.first() {
789                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
790                    _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
791                };
792                let cert_path = expect_str(args.get(1))?.to_string();
793                let key_path = expect_str(args.get(2))?.to_string();
794                let handler_name = expect_str(args.get(3))?.to_string();
795                let program = self.program.clone()
796                    .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
797                let policy = self.policy.clone();
798                let cert = std::fs::read(&cert_path)
799                    .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
800                let key = std::fs::read(&key_path)
801                    .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
802                serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
803            }
804            ("net", "serve_ws") => {
805                let port = match args.first() {
806                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
807                    _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
808                };
809                let handler_name = expect_str(args.get(1))?.to_string();
810                let program = self.program.clone()
811                    .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
812                let policy = self.policy.clone();
813                let registry = Arc::new(crate::ws::ChatRegistry::default());
814                crate::ws::serve_ws(port, handler_name, program, policy, registry)
815            }
816            ("chat", "broadcast") => {
817                let registry = self.chat_registry.as_ref()
818                    .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
819                let room = expect_str(args.first())?;
820                let body = expect_str(args.get(1))?;
821                crate::ws::chat_broadcast(registry, room, body);
822                Ok(Value::Unit)
823            }
824            ("chat", "send") => {
825                let registry = self.chat_registry.as_ref()
826                    .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
827                let conn_id = match args.first() {
828                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
829                    _ => return Err("chat.send: conn_id must be non-negative Int".into()),
830                };
831                let body = expect_str(args.get(1))?;
832                Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
833            }
834            ("kv", "open") => {
835                let path = expect_str(args.first())?.to_string();
836                // Honor write-allowlist: opening a Kv writes its
837                // backing files at `path`, so the same scoping that
838                // applies to `io.write` applies here.
839                if !self.policy.allow_fs_write.is_empty() {
840                    let p = std::path::Path::new(&path);
841                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
842                        return Ok(err(Value::Str(format!(
843                            "kv.open: `{path}` outside --allow-fs-write"))));
844                    }
845                }
846                match sled::open(&path) {
847                    Ok(db) => {
848                        let handle = next_kv_handle();
849                        kv_registry().lock().unwrap().insert(handle, db);
850                        Ok(ok(Value::Int(handle as i64)))
851                    }
852                    Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
853                }
854            }
855            ("kv", "close") => {
856                let h = expect_kv_handle(args.first())?;
857                kv_registry().lock().unwrap().remove(h);
858                Ok(Value::Unit)
859            }
860            ("kv", "get") => {
861                let h = expect_kv_handle(args.first())?;
862                let key = expect_str(args.get(1))?;
863                let mut reg = kv_registry().lock().unwrap();
864                let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
865                match db.get(key.as_bytes()) {
866                    Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
867                    Ok(None) => Ok(none()),
868                    Err(e) => Err(format!("kv.get: {e}")),
869                }
870            }
871            ("kv", "put") => {
872                let h = expect_kv_handle(args.first())?;
873                let key = expect_str(args.get(1))?.to_string();
874                let val = expect_bytes(args.get(2))?.clone();
875                let mut reg = kv_registry().lock().unwrap();
876                let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
877                match db.insert(key.as_bytes(), val) {
878                    Ok(_) => Ok(ok(Value::Unit)),
879                    Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
880                }
881            }
882            ("kv", "delete") => {
883                let h = expect_kv_handle(args.first())?;
884                let key = expect_str(args.get(1))?;
885                let mut reg = kv_registry().lock().unwrap();
886                let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
887                match db.remove(key.as_bytes()) {
888                    Ok(_) => Ok(ok(Value::Unit)),
889                    Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
890                }
891            }
892            ("kv", "contains") => {
893                let h = expect_kv_handle(args.first())?;
894                let key = expect_str(args.get(1))?;
895                let mut reg = kv_registry().lock().unwrap();
896                let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
897                match db.contains_key(key.as_bytes()) {
898                    Ok(present) => Ok(Value::Bool(present)),
899                    Err(e) => Err(format!("kv.contains: {e}")),
900                }
901            }
902            ("kv", "list_prefix") => {
903                let h = expect_kv_handle(args.first())?;
904                let prefix = expect_str(args.get(1))?;
905                let mut reg = kv_registry().lock().unwrap();
906                let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
907                let mut keys: Vec<Value> = Vec::new();
908                for kv in db.scan_prefix(prefix.as_bytes()) {
909                    let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
910                    let s = String::from_utf8_lossy(&k).to_string();
911                    keys.push(Value::Str(s));
912                }
913                Ok(Value::List(keys))
914            }
915            ("sql", "open") => {
916                let path = expect_str(args.first())?.to_string();
917                // Same shape as `kv.open`: opening creates the
918                // SQLite file, so the fs-write allowlist applies
919                // (in-memory paths are exempt).
920                if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
921                    let p = std::path::Path::new(&path);
922                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
923                        return Ok(err(Value::Str(format!(
924                            "sql.open: `{path}` outside --allow-fs-write"))));
925                    }
926                }
927                match rusqlite::Connection::open(&path) {
928                    Ok(conn) => {
929                        let handle = next_sql_handle();
930                        sql_registry().lock().unwrap().insert(handle, conn);
931                        Ok(ok(Value::Int(handle as i64)))
932                    }
933                    Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
934                }
935            }
936            ("sql", "close") => {
937                let h = expect_sql_handle(args.first())?;
938                sql_registry().lock().unwrap().remove(h);
939                Ok(Value::Unit)
940            }
941            ("sql", "exec") => {
942                let h = expect_sql_handle(args.first())?;
943                let stmt = expect_str(args.get(1))?.to_string();
944                let params = expect_str_list(args.get(2))?;
945                let arc = sql_registry().lock().unwrap()
946                    .touch_get(h)
947                    .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
948                let conn = arc.lock().unwrap();
949                let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
950                    .map(|s| s as &dyn rusqlite::ToSql)
951                    .collect();
952                match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
953                    Ok(n)  => Ok(ok(Value::Int(n as i64))),
954                    Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
955                }
956            }
957            ("sql", "query") => {
958                let h = expect_sql_handle(args.first())?;
959                let stmt_str = expect_str(args.get(1))?.to_string();
960                let params = expect_str_list(args.get(2))?;
961                let arc = sql_registry().lock().unwrap()
962                    .touch_get(h)
963                    .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
964                let conn = arc.lock().unwrap();
965                Ok(sql_run_query(&conn, &stmt_str, &params))
966            }
967            ("proc", "spawn") => {
968                // The escape hatch effect. Spawns a child process,
969                // collects its stdout/stderr, returns a structured
970                // record. Allow-list is the binary basename: anything
971                // outside `--allow-proc` is rejected pre-spawn.
972                //
973                // What this does NOT validate (per SECURITY.md):
974                // - per-arg content (a script-like CLI invoked via
975                //   --eval=... can run anything)
976                // - environment variables (inherited from the parent)
977                // - working directory (the parent's)
978                //
979                // For untrusted input, layer with OS-level
980                // sandboxing — gVisor / nsjail / a container.
981                let cmd = expect_str(args.first())?.to_string();
982                let raw_args = match args.get(1) {
983                    Some(Value::List(items)) => items,
984                    Some(other) => return Err(format!(
985                        "proc.spawn: args must be List[Str], got {other:?}")),
986                    None => return Err("proc.spawn: missing args list".into()),
987                };
988                let str_args: Vec<String> = raw_args.iter().map(|v| match v {
989                    Value::Str(s) => Ok(s.clone()),
990                    other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
991                }).collect::<Result<Vec<_>, _>>()?;
992
993                // Allow-list check: empty list = any binary (escape
994                // hatch); non-empty = basename of cmd must match an
995                // entry exactly.
996                if !self.policy.allow_proc.is_empty() {
997                    let basename = std::path::Path::new(&cmd)
998                        .file_name()
999                        .and_then(|s| s.to_str())
1000                        .unwrap_or(&cmd);
1001                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
1002                        return Ok(err(Value::Str(format!(
1003                            "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1004                            self.policy.allow_proc
1005                        ))));
1006                    }
1007                }
1008
1009                // Hard caps: the spec doesn't pin numbers, but
1010                // unbounded argv is a DoS vector.
1011                if str_args.len() > 1024 {
1012                    return Ok(err(Value::Str(
1013                        "proc.spawn: arg-count exceeds 1024".into())));
1014                }
1015                if str_args.iter().any(|a| a.len() > 65_536) {
1016                    return Ok(err(Value::Str(
1017                        "proc.spawn: per-arg length exceeds 64 KiB".into())));
1018                }
1019
1020                let output = std::process::Command::new(&cmd)
1021                    .args(&str_args)
1022                    .output();
1023                match output {
1024                    Ok(o) => {
1025                        let mut rec = indexmap::IndexMap::new();
1026                        rec.insert("stdout".into(), Value::Str(
1027                            String::from_utf8_lossy(&o.stdout).to_string()));
1028                        rec.insert("stderr".into(), Value::Str(
1029                            String::from_utf8_lossy(&o.stderr).to_string()));
1030                        rec.insert("exit_code".into(), Value::Int(
1031                            o.status.code().unwrap_or(-1) as i64));
1032                        Ok(ok(Value::Record(rec)))
1033                    }
1034                    Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1035                }
1036            }
1037            other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1038        }
1039    }
1040}
1041
1042/// Blocks the calling thread, accepts incoming HTTP requests on
1043/// `127.0.0.1:port`, and dispatches each through the named Lex
1044/// stage. Each request gets a fresh `Vm`; the program and policy
1045/// are shared.
1046///
1047/// Handler signature in Lex (by convention):
1048///   fn <name>(req :: Record { method :: Str, path :: Str, body :: Str })
1049///        -> Record { status :: Int, body :: Str }
1050/// PEM-encoded certificate + private key, both as raw bytes.
1051pub struct TlsConfig {
1052    pub cert: Vec<u8>,
1053    pub key: Vec<u8>,
1054}
1055
1056fn serve_http(
1057    port: u16,
1058    handler_name: String,
1059    program: Arc<Program>,
1060    policy: Policy,
1061    tls: Option<TlsConfig>,
1062) -> Result<Value, String> {
1063    let (server, scheme) = match tls {
1064        None => (
1065            tiny_http::Server::http(("127.0.0.1", port))
1066                .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1067            "http",
1068        ),
1069        Some(cfg) => {
1070            let ssl = tiny_http::SslConfig {
1071                certificate: cfg.cert,
1072                private_key: cfg.key,
1073            };
1074            (
1075                tiny_http::Server::https(("127.0.0.1", port), ssl)
1076                    .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1077                "https",
1078            )
1079        }
1080    };
1081    eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1082    // Thread-per-request: the main loop accepts; each request runs on
1083    // its own worker thread with its own fresh Vm. The Program is
1084    // shared via Arc; Policy and handler_name are cloned per request.
1085    // Lex's immutability means there's no shared mutable state at the
1086    // language level — workers don't race.
1087    for req in server.incoming_requests() {
1088        let program = Arc::clone(&program);
1089        let policy = policy.clone();
1090        let handler_name = handler_name.clone();
1091        std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1092    }
1093    Ok(Value::Unit)
1094}
1095
1096fn handle_request(
1097    mut req: tiny_http::Request,
1098    program: Arc<Program>,
1099    policy: Policy,
1100    handler_name: String,
1101) {
1102    let lex_req = build_request_value(&mut req);
1103    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1104    let mut vm = Vm::with_handler(&program, Box::new(handler));
1105    match vm.call(&handler_name, vec![lex_req]) {
1106        Ok(resp) => {
1107            let (status, body) = unpack_response(&resp);
1108            let response = tiny_http::Response::from_string(body).with_status_code(status);
1109            let _ = req.respond(response);
1110        }
1111        Err(e) => {
1112            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1113                .with_status_code(500);
1114            let _ = req.respond(response);
1115        }
1116    }
1117}
1118
1119fn build_request_value(req: &mut tiny_http::Request) -> Value {
1120    let method = format!("{:?}", req.method()).to_uppercase();
1121    let url = req.url().to_string();
1122    let (path, query) = match url.split_once('?') {
1123        Some((p, q)) => (p.to_string(), q.to_string()),
1124        None => (url, String::new()),
1125    };
1126    let mut body = String::new();
1127    let _ = req.as_reader().read_to_string(&mut body);
1128    let mut rec = indexmap::IndexMap::new();
1129    rec.insert("method".into(), Value::Str(method));
1130    rec.insert("path".into(), Value::Str(path));
1131    rec.insert("query".into(), Value::Str(query));
1132    rec.insert("body".into(), Value::Str(body));
1133    Value::Record(rec)
1134}
1135
1136fn unpack_response(v: &Value) -> (u16, String) {
1137    if let Value::Record(rec) = v {
1138        let status = rec.get("status").and_then(|s| match s {
1139            Value::Int(n) => Some(*n as u16),
1140            _ => None,
1141        }).unwrap_or(200);
1142        let body = rec.get("body").and_then(|b| match b {
1143            Value::Str(s) => Some(s.clone()),
1144            _ => None,
1145        }).unwrap_or_default();
1146        return (status, body);
1147    }
1148    (500, format!("handler returned non-record: {v:?}"))
1149}
1150
1151/// HTTP/1.1 client backed by `ureq` + `rustls`. Accepts both
1152/// `http://` and `https://` URLs. Returns `Result[Str, Str]` as a
1153/// Lex `Value::Variant`. The earlier hand-rolled HTTP/1.0 client
1154/// was plain-TCP only — most public APIs are HTTPS, so the demo
1155/// could fetch `example.com` but not `wttr.in` or `api.github.com`.
1156fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1157    use std::time::Duration;
1158    // ureq 3 puts 4xx/5xx behind `Error::StatusCode(code)` and consumes
1159    // the response, so the body would be lost. Disabling
1160    // `http_status_as_error` lets us check the status manually and
1161    // surface `Err("status 404: <body>")` like the old code did.
1162    let agent: ureq::Agent = ureq::Agent::config_builder()
1163        .timeout_connect(Some(Duration::from_secs(10)))
1164        .timeout_recv_body(Some(Duration::from_secs(30)))
1165        .timeout_send_body(Some(Duration::from_secs(10)))
1166        .http_status_as_error(false)
1167        .build()
1168        .into();
1169    let resp = match (method, body) {
1170        ("GET", _) => agent.get(url).call(),
1171        ("POST", Some(b)) => agent.post(url).send(b),
1172        ("POST", None) => agent.post(url).send(""),
1173        (m, _) => return err_value(format!("unsupported method: {m}")),
1174    };
1175    match resp {
1176        Ok(mut r) => {
1177            let status = r.status().as_u16();
1178            let body = r.body_mut().read_to_string().unwrap_or_default();
1179            if (200..300).contains(&status) {
1180                Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1181            } else {
1182                err_value(format!("status {status}: {body}"))
1183            }
1184        }
1185        Err(e) => err_value(format!("transport: {e}")),
1186    }
1187}
1188
1189/// Build a ureq agent for `std.http.{send,get,post}` with the given
1190/// timeout (None → use the same defaults as the legacy `net.{get,post}`
1191/// path). Separate from `http_request` so the rich `http.send` flow
1192/// can supply per-request overrides.
1193fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1194    use std::time::Duration;
1195    let mut b = ureq::Agent::config_builder()
1196        .timeout_connect(Some(Duration::from_secs(10)))
1197        .timeout_recv_body(Some(Duration::from_secs(30)))
1198        .timeout_send_body(Some(Duration::from_secs(10)))
1199        .http_status_as_error(false);
1200    if let Some(ms) = timeout_ms {
1201        let d = Duration::from_millis(ms);
1202        b = b.timeout_global(Some(d));
1203    }
1204    b.build().into()
1205}
1206
1207/// Map ureq's transport error to the structured `HttpError` variant
1208/// std.http exposes to user code. Anything not specifically a
1209/// timeout / TLS error funnels into `NetworkError`.
1210fn http_error_value(e: ureq::Error) -> Value {
1211    let (ctor, payload): (&str, Option<String>) = match &e {
1212        ureq::Error::Timeout(_) => ("TimeoutError", None),
1213        ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1214        ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1215        ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1216        _ => ("NetworkError", Some(format!("{e}"))),
1217    };
1218    let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1219    let inner = Value::Variant { name: ctor.into(), args };
1220    Value::Variant { name: "Err".into(), args: vec![inner] }
1221}
1222
1223fn http_decode_err(msg: String) -> Value {
1224    let inner = Value::Variant {
1225        name: "DecodeError".into(),
1226        args: vec![Value::Str(msg)],
1227    };
1228    Value::Variant { name: "Err".into(), args: vec![inner] }
1229}
1230
1231/// Run a request and pack the ureq response into the
1232/// `{ status, headers, body }` Lex record (or the structured
1233/// `HttpError` on failure). `headers_extra` pairs are appended to the
1234/// outgoing request after `content_type` is applied.
1235fn http_send_simple(
1236    method: &str,
1237    url: &str,
1238    body: Option<Vec<u8>>,
1239    content_type: &str,
1240    timeout_ms: Option<u64>,
1241) -> Value {
1242    http_send_full(method, url, body, content_type, &[], timeout_ms)
1243}
1244
1245fn http_send_full(
1246    method: &str,
1247    url: &str,
1248    body: Option<Vec<u8>>,
1249    content_type: &str,
1250    headers: &[(String, String)],
1251    timeout_ms: Option<u64>,
1252) -> Value {
1253    let agent = http_agent(timeout_ms);
1254    let resp = match method {
1255        "GET" => {
1256            let mut req = agent.get(url);
1257            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1258            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1259            req.call()
1260        }
1261        "POST" => {
1262            let body = body.unwrap_or_default();
1263            let mut req = agent.post(url);
1264            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1265            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1266            req.send(&body[..])
1267        }
1268        m => {
1269            // Other methods (PUT, DELETE, PATCH, ...) fall through
1270            // here in v1.5; for now surface a structured DecodeError
1271            // so the caller can match it.
1272            return http_decode_err(format!("unsupported method: {m}"));
1273        }
1274    };
1275    match resp {
1276        Ok(mut r) => {
1277            let status = r.status().as_u16() as i64;
1278            let headers_map = collect_response_headers(r.headers());
1279            let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1280                Ok(b) => b,
1281                Err(e) => return http_decode_err(format!("body read: {e}")),
1282            };
1283            let mut rec = indexmap::IndexMap::new();
1284            rec.insert("status".into(), Value::Int(status));
1285            rec.insert("headers".into(), Value::Map(headers_map));
1286            rec.insert("body".into(), Value::Bytes(body_bytes));
1287            Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1288        }
1289        Err(e) => http_error_value(e),
1290    }
1291}
1292
1293fn collect_response_headers(
1294    headers: &ureq::http::HeaderMap,
1295) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1296    let mut out = std::collections::BTreeMap::new();
1297    for (name, value) in headers.iter() {
1298        let v = value.to_str().unwrap_or("").to_string();
1299        out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1300    }
1301    out
1302}
1303
1304/// Pull the standard `HttpRequest` shape out of a `Value::Record`
1305/// and dispatch through `http_send_full`. The handler verifies
1306/// `--allow-net-host` for the URL before sending.
1307fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1308    let method = match req.get("method") {
1309        Some(Value::Str(s)) => s.clone(),
1310        _ => return http_decode_err("HttpRequest.method must be Str".into()),
1311    };
1312    let url = match req.get("url") {
1313        Some(Value::Str(s)) => s.clone(),
1314        _ => return http_decode_err("HttpRequest.url must be Str".into()),
1315    };
1316    if let Err(e) = handler.ensure_host_allowed(&url) {
1317        return http_decode_err(e);
1318    }
1319    let body = match req.get("body") {
1320        Some(Value::Variant { name, args }) if name == "None" => None,
1321        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1322            [Value::Bytes(b)] => Some(b.clone()),
1323            _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1324        },
1325        _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1326    };
1327    let timeout_ms = match req.get("timeout_ms") {
1328        Some(Value::Variant { name, .. }) if name == "None" => None,
1329        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1330            [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1331            _ => return http_decode_err(
1332                "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1333        },
1334        _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1335    };
1336    let headers: Vec<(String, String)> = match req.get("headers") {
1337        Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1338            let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1339            let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1340            Some((kk, vv))
1341        }).collect(),
1342        _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1343    };
1344    http_send_full(&method, &url, body, "", &headers, timeout_ms)
1345}
1346
1347fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1348    match v {
1349        Some(Value::Record(r)) => Ok(r),
1350        Some(other) => Err(format!("expected Record, got {other:?}")),
1351        None => Err("missing Record argument".into()),
1352    }
1353}
1354
1355fn err_value(msg: String) -> Value {
1356    Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1357}
1358
1359fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1360    match v {
1361        Some(Value::Str(s)) => Ok(s),
1362        Some(other) => Err(format!("expected Str arg, got {other:?}")),
1363        None => Err("missing argument".into()),
1364    }
1365}
1366
1367fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1368    match v {
1369        Some(Value::Int(n)) => Ok(*n),
1370        Some(other) => Err(format!("expected Int arg, got {other:?}")),
1371        None => Err("missing argument".into()),
1372    }
1373}
1374
1375fn ok(v: Value) -> Value {
1376    Value::Variant { name: "Ok".into(), args: vec![v] }
1377}
1378fn err(v: Value) -> Value {
1379    Value::Variant { name: "Err".into(), args: vec![v] }
1380}
1381
1382impl DefaultHandler {
1383    /// Implementation of `agent.call_mcp(server, tool, args_json)`.
1384    /// Goes through the LRU client cache (#197): the named server
1385    /// is spawned on first use and reused on subsequent calls.
1386    /// On failure the offending client is dropped so the next
1387    /// call respawns rather than silently failing forever.
1388    fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1389        let server = match args.first() {
1390            Some(Value::Str(s)) => s.clone(),
1391            _ => return err(Value::Str(
1392                "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1393        };
1394        let tool = match args.get(1) {
1395            Some(Value::Str(s)) => s.clone(),
1396            _ => return err(Value::Str(
1397                "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1398        };
1399        let args_json = match args.get(2) {
1400            Some(Value::Str(s)) => s.clone(),
1401            _ => return err(Value::Str(
1402                "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1403        };
1404        let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1405            Ok(v) => v,
1406            Err(e) => return err(Value::Str(format!(
1407                "agent.call_mcp: args_json is not valid JSON: {e}"))),
1408        };
1409        match self.mcp_clients.call(&server, &tool, parsed) {
1410            Ok(result) => ok(Value::Str(
1411                serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1412            Err(e) => err(Value::Str(e)),
1413        }
1414    }
1415}
1416
1417/// Implementation of `agent.local_complete(prompt)` (#196).
1418/// Hits Ollama (or any compatible HTTP service via `OLLAMA_HOST`)
1419/// and returns the completion text. Override at the
1420/// `EffectHandler` layer if you need a different transport.
1421fn dispatch_llm_local(args: Vec<Value>) -> Value {
1422    let prompt = match args.first() {
1423        Some(Value::Str(s)) => s.clone(),
1424        _ => return err(Value::Str(
1425            "agent.local_complete(prompt): prompt must be Str".into())),
1426    };
1427    match crate::llm::local_complete(&prompt) {
1428        Ok(text) => ok(Value::Str(text)),
1429        Err(e) => err(Value::Str(e)),
1430    }
1431}
1432
1433/// Implementation of `agent.cloud_complete(prompt)` (#196).
1434/// Hits OpenAI's chat-completions API (or any compatible
1435/// service via `OPENAI_BASE_URL`) and returns the assistant
1436/// message. Requires `OPENAI_API_KEY`. Override at the
1437/// `EffectHandler` layer for custom auth, batching, or other
1438/// providers.
1439fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1440    let prompt = match args.first() {
1441        Some(Value::Str(s)) => s.clone(),
1442        _ => return err(Value::Str(
1443            "agent.cloud_complete(prompt): prompt must be Str".into())),
1444    };
1445    match crate::llm::cloud_complete(&prompt) {
1446        Ok(text) => ok(Value::Str(text)),
1447        Err(e) => err(Value::Str(e)),
1448    }
1449}
1450
1451fn some(v: Value) -> Value {
1452    Value::Variant { name: "Some".into(), args: vec![v] }
1453}
1454fn none() -> Value {
1455    Value::Variant { name: "None".into(), args: vec![] }
1456}
1457
1458fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1459    match v {
1460        Some(Value::Bytes(b)) => Ok(b),
1461        Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1462        None => Err("missing argument".into()),
1463    }
1464}
1465
1466fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1467    match v {
1468        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1469        Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1470        None => Err("missing Kv argument".into()),
1471    }
1472}
1473
1474fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1475    match v {
1476        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1477        Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1478        None => Err("missing Db argument".into()),
1479    }
1480}
1481
1482fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1483    match v {
1484        Some(Value::List(items)) => items.iter().map(|x| match x {
1485            Value::Str(s) => Ok(s.clone()),
1486            other => Err(format!("expected List[Str] element, got {other:?}")),
1487        }).collect(),
1488        Some(other) => Err(format!("expected List[Str], got {other:?}")),
1489        None => Err("missing List[Str] argument".into()),
1490    }
1491}
1492
1493/// Run a `SELECT` (or any returning statement) and pack the rows
1494/// into `Value::List(Value::Record(...))` shape — column-name keys,
1495/// SQLite-typed values mapped one-for-one to Lex value variants
1496/// (Null → Unit, Integer → Int, Real → Float, Text → Str, Blob →
1497/// Bytes). Returns the standard `Result[List[T], Str]` Lex shape.
1498fn sql_run_query(
1499    conn: &rusqlite::Connection,
1500    stmt_str: &str,
1501    params: &[String],
1502) -> Value {
1503    let mut stmt = match conn.prepare(stmt_str) {
1504        Ok(s)  => s,
1505        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1506    };
1507    let column_count = stmt.column_count();
1508    let column_names: Vec<String> = (0..column_count)
1509        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1510        .collect();
1511    let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1512        .map(|s| s as &dyn rusqlite::ToSql)
1513        .collect();
1514    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1515        Ok(r)  => r,
1516        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1517    };
1518    let mut out: Vec<Value> = Vec::new();
1519    loop {
1520        let row = match rows.next() {
1521            Ok(Some(r)) => r,
1522            Ok(None)    => break,
1523            Err(e)      => return err(Value::Str(format!("sql.query: {e}"))),
1524        };
1525        let mut rec = indexmap::IndexMap::new();
1526        for (i, name) in column_names.iter().enumerate() {
1527            let cell = match row.get_ref(i) {
1528                Ok(c)  => sql_value_ref_to_lex(c),
1529                Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1530            };
1531            rec.insert(name.clone(), cell);
1532        }
1533        out.push(Value::Record(rec));
1534    }
1535    ok(Value::List(out))
1536}
1537
1538fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1539    use rusqlite::types::ValueRef;
1540    match v {
1541        ValueRef::Null       => Value::Unit,
1542        ValueRef::Integer(n) => Value::Int(n),
1543        ValueRef::Real(f)    => Value::Float(f),
1544        ValueRef::Text(s)    => Value::Str(String::from_utf8_lossy(s).into_owned()),
1545        ValueRef::Blob(b)    => Value::Bytes(b.to_vec()),
1546    }
1547}
1548
1549// -- log state (process-wide; configurable via log.set_*) --
1550
1551#[derive(Clone, Copy, PartialEq, PartialOrd)]
1552enum LogLevel { Debug, Info, Warn, Error }
1553
1554#[derive(Clone, Copy, PartialEq)]
1555enum LogFormat { Text, Json }
1556
1557#[derive(Clone)]
1558enum LogSink {
1559    Stderr,
1560    File(std::sync::Arc<Mutex<std::fs::File>>),
1561}
1562
1563struct LogState {
1564    level: LogLevel,
1565    format: LogFormat,
1566    sink: LogSink,
1567}
1568
1569fn log_state() -> &'static Mutex<LogState> {
1570    static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1571    STATE.get_or_init(|| Mutex::new(LogState {
1572        level: LogLevel::Info,
1573        format: LogFormat::Text,
1574        sink: LogSink::Stderr,
1575    }))
1576}
1577
1578fn parse_log_level(s: &str) -> Option<LogLevel> {
1579    match s {
1580        "debug" => Some(LogLevel::Debug),
1581        "info" => Some(LogLevel::Info),
1582        "warn" => Some(LogLevel::Warn),
1583        "error" => Some(LogLevel::Error),
1584        _ => None,
1585    }
1586}
1587
1588fn level_label(l: LogLevel) -> &'static str {
1589    match l {
1590        LogLevel::Debug => "debug",
1591        LogLevel::Info => "info",
1592        LogLevel::Warn => "warn",
1593        LogLevel::Error => "error",
1594    }
1595}
1596
1597fn emit_log(level: LogLevel, msg: &str) {
1598    let state = log_state().lock().unwrap();
1599    if level < state.level {
1600        return;
1601    }
1602    let ts = chrono::Utc::now().to_rfc3339();
1603    let line = match state.format {
1604        LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1605        LogFormat::Json => {
1606            // Hand-rolled JSON to avoid pulling serde_json into the
1607            // hot path; msg gets minimal escaping (the four common
1608            // cases that break a JSON line).
1609            let escaped = msg
1610                .replace('\\', "\\\\")
1611                .replace('"',  "\\\"")
1612                .replace('\n', "\\n")
1613                .replace('\r', "\\r");
1614            format!(
1615                "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1616                level_label(level),
1617            )
1618        }
1619    };
1620    let sink = state.sink.clone();
1621    drop(state);
1622    match sink {
1623        LogSink::Stderr => {
1624            use std::io::Write;
1625            let _ = std::io::stderr().write_all(line.as_bytes());
1626        }
1627        LogSink::File(f) => {
1628            use std::io::Write;
1629            if let Ok(mut g) = f.lock() {
1630                let _ = g.write_all(line.as_bytes());
1631            }
1632        }
1633    }
1634}
1635
1636pub(crate) struct ProcessState {
1637    child: std::process::Child,
1638    stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1639    stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1640}
1641
1642/// Process-wide registry of live `process.spawn` handles. Capped at
1643/// [`MAX_PROCESS_HANDLES`] to bound long-running programs that spawn
1644/// many short-lived children: on each `spawn` past the cap, the
1645/// least-recently-used entry is dropped (which `Drop`s its
1646/// `ProcessState`, leaving the child orphaned but the registry
1647/// bounded). `process.wait` also drops the entry on completion since
1648/// the handle becomes terminal once the child exits.
1649///
1650/// Each entry is wrapped in `Arc<Mutex<ProcessState>>` so the global
1651/// lookup mutex is held only briefly during dispatch — once we have
1652/// the per-handle `Arc`, the global lock is released and the slow
1653/// op (`wait`, `read_*_line`) only contends on its own handle's
1654/// mutex. Reads on different handles no longer block each other.
1655fn process_registry() -> &'static Mutex<ProcessRegistry> {
1656    static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1657    REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1658}
1659
1660const MAX_PROCESS_HANDLES: usize = 256;
1661
1662type SharedProcessState = Arc<Mutex<ProcessState>>;
1663
1664pub(crate) struct ProcessRegistry {
1665    entries: indexmap::IndexMap<u64, SharedProcessState>,
1666    cap: usize,
1667}
1668
1669impl ProcessRegistry {
1670    pub(crate) fn with_capacity(cap: usize) -> Self {
1671        Self { entries: indexmap::IndexMap::new(), cap }
1672    }
1673
1674    /// Insert a freshly-spawned child. If at cap, evict the LRU entry
1675    /// first; the dropped `ProcessState`'s child stays alive (orphaned)
1676    /// but its file descriptors are released.
1677    pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1678        if self.entries.len() >= self.cap {
1679            self.entries.shift_remove_index(0);
1680        }
1681        self.entries.insert(handle, Arc::new(Mutex::new(state)));
1682    }
1683
1684    /// Look up a handle, marking it most-recently-used on hit. Returns
1685    /// a clone of the shared `Arc` — callers should release the global
1686    /// registry lock before locking the per-handle mutex.
1687    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1688        let idx = self.entries.get_index_of(&handle)?;
1689        self.entries.move_index(idx, self.entries.len() - 1);
1690        self.entries.get(&handle).cloned()
1691    }
1692
1693    /// Drop the registry entry. The underlying `Arc` may outlive the
1694    /// removal if another op still holds it; that's intentional — the
1695    /// in-flight op finishes against the existing `ProcessState`, and
1696    /// only fresh lookups start failing.
1697    pub(crate) fn remove(&mut self, handle: u64) {
1698        self.entries.shift_remove(&handle);
1699    }
1700
1701    #[cfg(test)]
1702    pub(crate) fn len(&self) -> usize { self.entries.len() }
1703}
1704
1705fn next_process_handle() -> u64 {
1706    static COUNTER: AtomicU64 = AtomicU64::new(1);
1707    COUNTER.fetch_add(1, Ordering::SeqCst)
1708}
1709
1710#[cfg(all(test, unix))]
1711mod process_registry_tests {
1712    use super::{ProcessRegistry, ProcessState};
1713
1714    /// Spawn a trivial short-lived child for use as registry payload.
1715    /// `true` exits immediately — we don't actually run the child for
1716    /// real, we just need a valid `std::process::Child`.
1717    fn fresh_state() -> ProcessState {
1718        let child = std::process::Command::new("true")
1719            .stdout(std::process::Stdio::null())
1720            .stderr(std::process::Stdio::null())
1721            .spawn()
1722            .expect("spawn `true`");
1723        ProcessState { child, stdout: None, stderr: None }
1724    }
1725
1726    #[test]
1727    fn insert_and_get_round_trip() {
1728        let mut r = ProcessRegistry::with_capacity(4);
1729        r.insert(1, fresh_state());
1730        assert!(r.touch_get(1).is_some());
1731        assert!(r.touch_get(2).is_none());
1732    }
1733
1734    #[test]
1735    fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1736        let mut r = ProcessRegistry::with_capacity(4);
1737        r.insert(1, fresh_state());
1738        r.insert(2, fresh_state());
1739        let a = r.touch_get(1).unwrap();
1740        let b = r.touch_get(2).unwrap();
1741        // Different Arcs — pointer-equality check.
1742        assert!(!std::sync::Arc::ptr_eq(&a, &b));
1743    }
1744
1745    #[test]
1746    fn cap_evicts_lru_on_overflow() {
1747        let mut r = ProcessRegistry::with_capacity(2);
1748        r.insert(1, fresh_state());
1749        r.insert(2, fresh_state());
1750        let _ = r.touch_get(1);
1751        r.insert(3, fresh_state());
1752        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1753        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1754        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1755        assert_eq!(r.len(), 2);
1756    }
1757
1758    #[test]
1759    fn cap_with_no_touches_evicts_in_insertion_order() {
1760        let mut r = ProcessRegistry::with_capacity(2);
1761        r.insert(10, fresh_state());
1762        r.insert(20, fresh_state());
1763        r.insert(30, fresh_state());
1764        assert!(r.touch_get(10).is_none());
1765        assert!(r.touch_get(20).is_some());
1766        assert!(r.touch_get(30).is_some());
1767    }
1768
1769    #[test]
1770    fn remove_drops_entry() {
1771        let mut r = ProcessRegistry::with_capacity(4);
1772        r.insert(1, fresh_state());
1773        r.remove(1);
1774        assert!(r.touch_get(1).is_none());
1775        assert_eq!(r.len(), 0);
1776    }
1777
1778    #[test]
1779    fn many_inserts_stay_bounded_at_cap() {
1780        let cap = 8;
1781        let mut r = ProcessRegistry::with_capacity(cap);
1782        for i in 0..(cap as u64 * 3) {
1783            r.insert(i, fresh_state());
1784            assert!(r.len() <= cap);
1785        }
1786        assert_eq!(r.len(), cap);
1787    }
1788
1789    #[test]
1790    fn outstanding_arc_outlives_remove() {
1791        // Holding the per-handle Arc while another op removes the
1792        // entry must not invalidate the in-flight op. Mirrors the
1793        // wait-completes-then-removes pattern.
1794        let mut r = ProcessRegistry::with_capacity(4);
1795        r.insert(1, fresh_state());
1796        let arc = r.touch_get(1).expect("entry exists");
1797        r.remove(1);
1798        // Registry forgot about it, but the Arc still works.
1799        assert!(r.touch_get(1).is_none());
1800        let _state = arc.lock().unwrap();
1801    }
1802}
1803
1804fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
1805    match v {
1806        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1807        Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
1808        None => Err("missing ProcessHandle argument".into()),
1809    }
1810}
1811
1812/// Process-wide registry of open `Kv` handles. Each `kv.open` allocates
1813/// a new u64 handle via [`next_kv_handle`] and stores the `sled::Db`
1814/// here; subsequent ops fetch by handle. `kv.close` removes the entry.
1815///
1816/// Capped at [`MAX_KV_HANDLES`] to prevent leaks from long-running
1817/// programs that open many short-lived stores without calling
1818/// `kv.close`. On insert at cap, the least-recently-used entry is
1819/// dropped (closing its `sled::Db`); subsequent ops on the evicted
1820/// handle return the standard "closed or unknown Kv handle" error.
1821/// Any access (`get`, `put`, `delete`, `contains`, `list_prefix`)
1822/// touches the LRU order.
1823fn kv_registry() -> &'static Mutex<KvRegistry> {
1824    static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
1825    REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
1826}
1827
1828/// Maximum number of `kv.open` handles kept alive at once. Past this
1829/// cap, the least-recently-used handle is evicted on each new open.
1830/// Sized so that pathological "open and forget" programs are bounded
1831/// without breaking real-world programs that intentionally keep one or
1832/// two long-lived stores open.
1833const MAX_KV_HANDLES: usize = 256;
1834
1835/// LRU-bounded set of open `sled::Db` instances keyed by `u64` handle.
1836/// Built on `IndexMap` for O(1) insert / remove / lookup with
1837/// insertion-order traversal — touching an entry just shift-moves it
1838/// to the back, evictions pop from the front.
1839pub(crate) struct KvRegistry {
1840    entries: indexmap::IndexMap<u64, sled::Db>,
1841    cap: usize,
1842}
1843
1844impl KvRegistry {
1845    pub(crate) fn with_capacity(cap: usize) -> Self {
1846        Self { entries: indexmap::IndexMap::new(), cap }
1847    }
1848
1849    /// Insert a freshly-opened db. If we're already at cap, evict the
1850    /// LRU entry first; the dropped `sled::Db` closes its files.
1851    pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
1852        if self.entries.len() >= self.cap {
1853            self.entries.shift_remove_index(0);
1854        }
1855        self.entries.insert(handle, db);
1856    }
1857
1858    /// Look up a handle, marking it most-recently-used on hit.
1859    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
1860        let idx = self.entries.get_index_of(&handle)?;
1861        self.entries.move_index(idx, self.entries.len() - 1);
1862        self.entries.get(&handle)
1863    }
1864
1865    /// Explicit `kv.close`: drop the handle if present.
1866    pub(crate) fn remove(&mut self, handle: u64) {
1867        self.entries.shift_remove(&handle);
1868    }
1869
1870    #[cfg(test)]
1871    pub(crate) fn len(&self) -> usize { self.entries.len() }
1872}
1873
1874fn next_kv_handle() -> u64 {
1875    static COUNTER: AtomicU64 = AtomicU64::new(1);
1876    COUNTER.fetch_add(1, Ordering::SeqCst)
1877}
1878
1879/// Process-wide registry of open `Db` handles. Same shape as the kv
1880/// and process registries: per-handle `Arc<Mutex<…>>` so dispatch
1881/// only briefly holds the global lock and ops on different
1882/// connections don't serialize. LRU-bounded at
1883/// [`MAX_SQL_HANDLES`] to avoid leaks from long-running programs
1884/// that open many short-lived databases.
1885fn sql_registry() -> &'static Mutex<SqlRegistry> {
1886    static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
1887    REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
1888}
1889
1890const MAX_SQL_HANDLES: usize = 256;
1891
1892type SharedConn = Arc<Mutex<rusqlite::Connection>>;
1893
1894pub(crate) struct SqlRegistry {
1895    entries: indexmap::IndexMap<u64, SharedConn>,
1896    cap: usize,
1897}
1898
1899impl SqlRegistry {
1900    pub(crate) fn with_capacity(cap: usize) -> Self {
1901        Self { entries: indexmap::IndexMap::new(), cap }
1902    }
1903
1904    pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
1905        if self.entries.len() >= self.cap {
1906            self.entries.shift_remove_index(0);
1907        }
1908        self.entries.insert(handle, Arc::new(Mutex::new(conn)));
1909    }
1910
1911    /// Look up a handle, marking it MRU on hit. Returns a clone of
1912    /// the shared `Arc` so callers release the global registry
1913    /// lock before locking the per-handle mutex.
1914    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
1915        let idx = self.entries.get_index_of(&handle)?;
1916        self.entries.move_index(idx, self.entries.len() - 1);
1917        self.entries.get(&handle).cloned()
1918    }
1919
1920    pub(crate) fn remove(&mut self, handle: u64) {
1921        self.entries.shift_remove(&handle);
1922    }
1923
1924    #[cfg(test)]
1925    pub(crate) fn len(&self) -> usize { self.entries.len() }
1926}
1927
1928fn next_sql_handle() -> u64 {
1929    static COUNTER: AtomicU64 = AtomicU64::new(1);
1930    COUNTER.fetch_add(1, Ordering::SeqCst)
1931}
1932
1933#[cfg(test)]
1934mod sql_registry_tests {
1935    use super::SqlRegistry;
1936
1937    fn fresh() -> rusqlite::Connection {
1938        rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
1939    }
1940
1941    #[test]
1942    fn insert_and_get_round_trip() {
1943        let mut r = SqlRegistry::with_capacity(4);
1944        r.insert(1, fresh());
1945        assert!(r.touch_get(1).is_some());
1946        assert!(r.touch_get(2).is_none());
1947    }
1948
1949    #[test]
1950    fn cap_evicts_lru_on_overflow() {
1951        let mut r = SqlRegistry::with_capacity(2);
1952        r.insert(1, fresh());
1953        r.insert(2, fresh());
1954        let _ = r.touch_get(1);
1955        r.insert(3, fresh());
1956        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1957        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1958        assert!(r.touch_get(3).is_some(), "3 just inserted");
1959        assert_eq!(r.len(), 2);
1960    }
1961
1962    #[test]
1963    fn remove_drops_entry() {
1964        let mut r = SqlRegistry::with_capacity(4);
1965        r.insert(1, fresh());
1966        r.remove(1);
1967        assert!(r.touch_get(1).is_none());
1968        assert_eq!(r.len(), 0);
1969    }
1970
1971    #[test]
1972    fn many_inserts_stay_bounded_at_cap() {
1973        let cap = 8;
1974        let mut r = SqlRegistry::with_capacity(cap);
1975        for i in 0..(cap as u64 * 3) {
1976            r.insert(i, fresh());
1977            assert!(r.len() <= cap);
1978        }
1979        assert_eq!(r.len(), cap);
1980    }
1981}
1982
1983#[cfg(test)]
1984mod kv_registry_tests {
1985    use super::KvRegistry;
1986
1987    /// Spin up an isolated `sled::Db` in a temp dir. Each call gets a
1988    /// unique path so concurrent tests don't collide on the lockfile.
1989    fn fresh_db(tag: &str) -> sled::Db {
1990        let dir = std::env::temp_dir().join(format!(
1991            "lex-kv-reg-{}-{}-{}",
1992            std::process::id(),
1993            tag,
1994            std::time::SystemTime::now()
1995                .duration_since(std::time::UNIX_EPOCH)
1996                .unwrap()
1997                .as_nanos()
1998        ));
1999        sled::open(&dir).expect("sled open")
2000    }
2001
2002    #[test]
2003    fn insert_and_get_round_trip() {
2004        let mut r = KvRegistry::with_capacity(4);
2005        r.insert(1, fresh_db("a"));
2006        assert!(r.touch_get(1).is_some());
2007        assert!(r.touch_get(2).is_none());
2008    }
2009
2010    #[test]
2011    fn cap_evicts_lru_on_overflow() {
2012        // cap=2: insert 1, 2; touch 1 (now MRU); insert 3 → 2 evicted.
2013        let mut r = KvRegistry::with_capacity(2);
2014        r.insert(1, fresh_db("c1"));
2015        r.insert(2, fresh_db("c2"));
2016        let _ = r.touch_get(1);
2017        r.insert(3, fresh_db("c3"));
2018        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2019        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2020        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2021        assert_eq!(r.len(), 2);
2022    }
2023
2024    #[test]
2025    fn cap_with_no_touches_evicts_in_insertion_order() {
2026        // cap=2: insert 1, 2, 3 with no touches → 1 evicted (FIFO).
2027        let mut r = KvRegistry::with_capacity(2);
2028        r.insert(10, fresh_db("f1"));
2029        r.insert(20, fresh_db("f2"));
2030        r.insert(30, fresh_db("f3"));
2031        assert!(r.touch_get(10).is_none());
2032        assert!(r.touch_get(20).is_some());
2033        assert!(r.touch_get(30).is_some());
2034    }
2035
2036    #[test]
2037    fn remove_drops_entry() {
2038        let mut r = KvRegistry::with_capacity(4);
2039        r.insert(1, fresh_db("r1"));
2040        r.remove(1);
2041        assert!(r.touch_get(1).is_none());
2042        assert_eq!(r.len(), 0);
2043    }
2044
2045    #[test]
2046    fn remove_unknown_handle_is_noop() {
2047        let mut r = KvRegistry::with_capacity(4);
2048        r.insert(1, fresh_db("u1"));
2049        r.remove(999);
2050        assert!(r.touch_get(1).is_some());
2051    }
2052
2053    #[test]
2054    fn many_inserts_stay_bounded_at_cap() {
2055        // Exhaust the cap to confirm the registry never grows past it,
2056        // even under sustained churn.
2057        let cap = 8;
2058        let mut r = KvRegistry::with_capacity(cap);
2059        for i in 0..(cap as u64 * 3) {
2060            r.insert(i, fresh_db(&format!("b{i}")));
2061            assert!(r.len() <= cap);
2062        }
2063        assert_eq!(r.len(), cap);
2064    }
2065}