Skip to main content

lex_runtime/
handler.rs

1//! Native effect handlers, dispatched at runtime through the VM's
2//! `EffectHandler` trait. The handler also re-checks the runtime policy
3//! per spec §7.4 (the static check is necessary but not sufficient: a fn
4//! declared `[fs_read("/data")]` that's allowed at startup still has to
5//! pass the path check at the point of dispatch).
6
7use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18/// Output sink used by `io.print`. Tests inject a buffer; production prints
19/// to stdout.
20pub trait IoSink: Send {
21    fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26    fn print_line(&mut self, s: &str) {
27        println!("{s}");
28    }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34    fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37pub struct DefaultHandler {
38    policy: Policy,
39    pub sink: Box<dyn IoSink>,
40    /// Optional read root for `io.read` — when set, `io.read("p")` resolves
41    /// to `read_root.join(p)`. Lets tests run without touching the real fs.
42    pub read_root: Option<PathBuf>,
43    /// Per-run budget pool (#225). `Arc<AtomicU64>` so parallel
44    /// branches share one counter without locking. Initialized to
45    /// the policy ceiling at handler construction; each call to a
46    /// function with declared `[budget(N)]` deducts N atomically
47    /// via `note_call_budget`. Cloning the handler is intentional
48    /// for net.serve / chat handlers — they share the same pool.
49    pub budget_remaining: Arc<AtomicU64>,
50    /// The original ceiling that `budget_remaining` started at, kept
51    /// for diagnostics so a `BudgetExceeded` error can report
52    /// `(used, ceiling)` rather than just "exceeded by N".
53    pub budget_ceiling: Option<u64>,
54    /// Shared reference to the program, needed by `net.serve` so the
55    /// handler can spin up fresh VMs to dispatch incoming requests.
56    /// `None` if the handler was constructed without a program.
57    pub program: Option<Arc<Program>>,
58    /// Chat registry; populated by `net.serve_ws`'s per-message
59    /// dispatch so `chat.broadcast` / `chat.send` work from inside
60    /// a handler invocation.
61    pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
62    /// LRU cache of `agent.call_mcp` clients keyed by the
63    /// command-line string (#197). Avoids spawn-per-call cost
64    /// when an agent invokes the same MCP server in tight loops.
65    /// Capped — when the cache is full, the least-recently-used
66    /// entry is dropped (its subprocess is reaped on Drop).
67    pub mcp_clients: crate::mcp_client::McpClientCache,
68}
69
70impl DefaultHandler {
71    pub fn new(policy: Policy) -> Self {
72        // If the caller supplied a ceiling, the pool starts at that
73        // ceiling and counts down. No ceiling = `u64::MAX` so calls
74        // never refuse on budget grounds (existing behavior).
75        let ceiling = policy.budget;
76        let initial = ceiling.unwrap_or(u64::MAX);
77        Self {
78            policy,
79            sink: Box::new(StdoutSink),
80            read_root: None,
81            budget_remaining: Arc::new(AtomicU64::new(initial)),
82            budget_ceiling: ceiling,
83            program: None,
84            chat_registry: None,
85            mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
86        }
87    }
88
89    pub fn with_program(mut self, program: Arc<Program>) -> Self {
90        self.program = Some(program); self
91    }
92
93    pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
94        self.chat_registry = Some(registry); self
95    }
96
97    pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
98        self.sink = sink; self
99    }
100
101    pub fn with_read_root(mut self, root: PathBuf) -> Self {
102        self.read_root = Some(root); self
103    }
104
105    fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
106        if self.policy.allow_effects.contains(kind) {
107            Ok(())
108        } else {
109            Err(format!("effect `{kind}` not in --allow-effects"))
110        }
111    }
112
113    fn resolve_read_path(&self, p: &str) -> PathBuf {
114        match &self.read_root {
115            Some(root) => root.join(p.trim_start_matches('/')),
116            None => PathBuf::from(p),
117        }
118    }
119
120    fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
121        match op {
122            "debug" | "info" | "warn" | "error" => {
123                let msg = expect_str(args.first())?;
124                let level = match op {
125                    "debug" => LogLevel::Debug,
126                    "info" => LogLevel::Info,
127                    "warn" => LogLevel::Warn,
128                    _ => LogLevel::Error,
129                };
130                emit_log(level, msg);
131                Ok(Value::Unit)
132            }
133            "set_level" => {
134                let s = expect_str(args.first())?;
135                match parse_log_level(s) {
136                    Some(l) => {
137                        log_state().lock().unwrap().level = l;
138                        Ok(ok(Value::Unit))
139                    }
140                    None => Ok(err(Value::Str(format!(
141                        "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
142                }
143            }
144            "set_format" => {
145                let s = expect_str(args.first())?;
146                let fmt = match s {
147                    "text" => LogFormat::Text,
148                    "json" => LogFormat::Json,
149                    other => return Ok(err(Value::Str(format!(
150                        "log.set_format: unknown format `{other}`; expected text|json")))),
151                };
152                log_state().lock().unwrap().format = fmt;
153                Ok(ok(Value::Unit))
154            }
155            "set_sink" => {
156                let path = expect_str(args.first())?;
157                if path == "-" {
158                    log_state().lock().unwrap().sink = LogSink::Stderr;
159                    return Ok(ok(Value::Unit));
160                }
161                if let Err(e) = self.ensure_fs_write_path(path) {
162                    return Ok(err(Value::Str(e)));
163                }
164                match std::fs::OpenOptions::new()
165                    .create(true).append(true).open(path)
166                {
167                    Ok(f) => {
168                        log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
169                        Ok(ok(Value::Unit))
170                    }
171                    Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
172                }
173            }
174            other => Err(format!("unsupported log.{other}")),
175        }
176    }
177
178    fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
179        match op {
180            "spawn" => {
181                let cmd = expect_str(args.first())?.to_string();
182                let raw_args = match args.get(1) {
183                    Some(Value::List(items)) => items.clone(),
184                    _ => return Err("process.spawn: args must be List[Str]".into()),
185                };
186                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
187                    Value::Str(s) => Ok(s.clone()),
188                    other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
189                }).collect();
190                let str_args = str_args?;
191                let opts = match args.get(2) {
192                    Some(Value::Record(r)) => r.clone(),
193                    _ => return Err("process.spawn: missing or invalid opts record".into()),
194                };
195
196                // Allow-list check, mirroring the existing proc.spawn.
197                if !self.policy.allow_proc.is_empty() {
198                    let basename = std::path::Path::new(&cmd)
199                        .file_name()
200                        .and_then(|s| s.to_str())
201                        .unwrap_or(&cmd);
202                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
203                        return Ok(err(Value::Str(format!(
204                            "process.spawn: `{cmd}` not in --allow-proc {:?}",
205                            self.policy.allow_proc
206                        ))));
207                    }
208                }
209
210                let mut command = std::process::Command::new(&cmd);
211                command.args(&str_args);
212                command.stdin(std::process::Stdio::piped());
213                command.stdout(std::process::Stdio::piped());
214                command.stderr(std::process::Stdio::piped());
215
216                if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
217                    if name == "Some" {
218                        if let Some(Value::Str(s)) = vargs.first() {
219                            command.current_dir(s);
220                        }
221                    }
222                }
223                if let Some(Value::Map(env)) = opts.get("env") {
224                    for (k, v) in env {
225                        if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
226                            command.env(ks, vs);
227                        }
228                    }
229                }
230
231                let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
232                    Some(Value::Variant { name, args: vargs }) if name == "Some" => {
233                        match vargs.first() {
234                            Some(Value::Bytes(b)) => Some(b.clone()),
235                            _ => None,
236                        }
237                    }
238                    _ => None,
239                };
240
241                let mut child = match command.spawn() {
242                    Ok(c) => c,
243                    Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
244                };
245
246                if let Some(payload) = stdin_payload {
247                    if let Some(mut stdin) = child.stdin.take() {
248                        use std::io::Write;
249                        let _ = stdin.write_all(&payload);
250                        // Drop closes stdin; the child sees EOF.
251                    }
252                }
253
254                let stdout = child.stdout.take().map(std::io::BufReader::new);
255                let stderr = child.stderr.take().map(std::io::BufReader::new);
256                let handle = next_process_handle();
257                process_registry().lock().unwrap().insert(handle, ProcessState {
258                    child,
259                    stdout,
260                    stderr,
261                });
262                Ok(ok(Value::Int(handle as i64)))
263            }
264            "read_stdout_line" => Self::read_line_op(args, true),
265            "read_stderr_line" => Self::read_line_op(args, false),
266            "wait" => {
267                let h = expect_process_handle(args.first())?;
268                // Look up the per-handle Arc, then release the global
269                // lock before the (slow) wait so unrelated handles
270                // can dispatch concurrently.
271                let arc = process_registry().lock().unwrap()
272                    .touch_get(h)
273                    .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
274                let status = {
275                    let mut state = arc.lock().unwrap();
276                    state.child.wait().map_err(|e| format!("process.wait: {e}"))?
277                };
278                // Wait completion makes the handle terminal; drop it
279                // from the registry so the cap doesn't fill up with
280                // exited children.
281                process_registry().lock().unwrap().remove(h);
282                let mut rec = indexmap::IndexMap::new();
283                rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
284                #[cfg(unix)]
285                {
286                    use std::os::unix::process::ExitStatusExt;
287                    rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
288                }
289                #[cfg(not(unix))]
290                {
291                    rec.insert("signaled".into(), Value::Bool(false));
292                }
293                Ok(Value::Record(rec))
294            }
295            "kill" => {
296                let h = expect_process_handle(args.first())?;
297                let _signal = expect_str(args.get(1))?;
298                let arc = process_registry().lock().unwrap()
299                    .touch_get(h)
300                    .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
301                let mut state = arc.lock().unwrap();
302                // Cross-platform: only `kill` (SIGKILL-equivalent on
303                // Windows). Signal-name dispatch is a v1.5 follow-up.
304                match state.child.kill() {
305                    Ok(_) => Ok(ok(Value::Unit)),
306                    Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
307                }
308            }
309            "run" => {
310                let cmd = expect_str(args.first())?.to_string();
311                let raw_args = match args.get(1) {
312                    Some(Value::List(items)) => items.clone(),
313                    _ => return Err("process.run: args must be List[Str]".into()),
314                };
315                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
316                    Value::Str(s) => Ok(s.clone()),
317                    other => Err(format!("process.run: arg must be Str, got {other:?}")),
318                }).collect();
319                let str_args = str_args?;
320                if !self.policy.allow_proc.is_empty() {
321                    let basename = std::path::Path::new(&cmd)
322                        .file_name()
323                        .and_then(|s| s.to_str())
324                        .unwrap_or(&cmd);
325                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
326                        return Ok(err(Value::Str(format!(
327                            "process.run: `{cmd}` not in --allow-proc {:?}",
328                            self.policy.allow_proc
329                        ))));
330                    }
331                }
332                match std::process::Command::new(&cmd).args(&str_args).output() {
333                    Ok(o) => {
334                        let mut rec = indexmap::IndexMap::new();
335                        rec.insert("stdout".into(), Value::Str(
336                            String::from_utf8_lossy(&o.stdout).to_string()));
337                        rec.insert("stderr".into(), Value::Str(
338                            String::from_utf8_lossy(&o.stderr).to_string()));
339                        rec.insert("exit_code".into(), Value::Int(
340                            o.status.code().unwrap_or(-1) as i64));
341                        Ok(ok(Value::Record(rec)))
342                    }
343                    Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
344                }
345            }
346            other => Err(format!("unsupported process.{other}")),
347        }
348    }
349
350    /// Read one line from the child's stdout (`is_stdout = true`) or
351    /// stderr. Returns `None` (Lex `Option`) at EOF; subsequent calls
352    /// keep returning `None`. Holds only the per-handle mutex during
353    /// the (potentially blocking) read, so reads on one handle don't
354    /// block reads/waits on a different handle.
355    fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
356        let h = expect_process_handle(args.first())?;
357        let arc = process_registry().lock().unwrap()
358            .touch_get(h)
359            .ok_or_else(|| format!(
360                "process.read_{}_line: closed or unknown ProcessHandle",
361                if is_stdout { "stdout" } else { "stderr" }))?;
362        let mut state = arc.lock().unwrap();
363        let reader_opt = if is_stdout {
364            state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
365        } else {
366            state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
367        };
368        let reader = match reader_opt {
369            Some(r) => r,
370            None => return Ok(none()),
371        };
372        let mut line = String::new();
373        match reader.read_line(&mut line) {
374            Ok(0) => Ok(none()),
375            Ok(_) => {
376                if line.ends_with('\n') { line.pop(); }
377                if line.ends_with('\r') { line.pop(); }
378                Ok(some(Value::Str(line)))
379            }
380            Err(e) => Err(format!("process.read_*_line: {e}")),
381        }
382    }
383
384    fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
385        match op {
386            "exists" => {
387                let path = expect_str(args.first())?.to_string();
388                if let Err(e) = self.ensure_fs_walk_path(&path) {
389                    return Ok(err(Value::Str(e)));
390                }
391                Ok(Value::Bool(std::path::Path::new(&path).exists()))
392            }
393            "is_file" => {
394                let path = expect_str(args.first())?.to_string();
395                if let Err(e) = self.ensure_fs_walk_path(&path) {
396                    return Ok(err(Value::Str(e)));
397                }
398                Ok(Value::Bool(std::path::Path::new(&path).is_file()))
399            }
400            "is_dir" => {
401                let path = expect_str(args.first())?.to_string();
402                if let Err(e) = self.ensure_fs_walk_path(&path) {
403                    return Ok(err(Value::Str(e)));
404                }
405                Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
406            }
407            "stat" => {
408                let path = expect_str(args.first())?.to_string();
409                if let Err(e) = self.ensure_fs_walk_path(&path) {
410                    return Ok(err(Value::Str(e)));
411                }
412                match std::fs::metadata(&path) {
413                    Ok(md) => {
414                        let mtime = md.modified()
415                            .ok()
416                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
417                            .map(|d| d.as_secs() as i64)
418                            .unwrap_or(0);
419                        let mut rec = indexmap::IndexMap::new();
420                        rec.insert("size".into(), Value::Int(md.len() as i64));
421                        rec.insert("mtime".into(), Value::Int(mtime));
422                        rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
423                        rec.insert("is_file".into(), Value::Bool(md.is_file()));
424                        Ok(ok(Value::Record(rec)))
425                    }
426                    Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
427                }
428            }
429            "list_dir" => {
430                let path = expect_str(args.first())?.to_string();
431                if let Err(e) = self.ensure_fs_walk_path(&path) {
432                    return Ok(err(Value::Str(e)));
433                }
434                match std::fs::read_dir(&path) {
435                    Ok(rd) => {
436                        let mut entries: Vec<Value> = Vec::new();
437                        for ent in rd {
438                            match ent {
439                                Ok(e) => {
440                                    let p = e.path();
441                                    entries.push(Value::Str(p.to_string_lossy().into_owned()));
442                                }
443                                Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
444                            }
445                        }
446                        Ok(ok(Value::List(entries)))
447                    }
448                    Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
449                }
450            }
451            "walk" => {
452                let path = expect_str(args.first())?.to_string();
453                if let Err(e) = self.ensure_fs_walk_path(&path) {
454                    return Ok(err(Value::Str(e)));
455                }
456                let mut paths: Vec<Value> = Vec::new();
457                for ent in walkdir::WalkDir::new(&path) {
458                    match ent {
459                        Ok(e) => paths.push(Value::Str(
460                            e.path().to_string_lossy().into_owned())),
461                        Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
462                    }
463                }
464                Ok(ok(Value::List(paths)))
465            }
466            "glob" => {
467                let pattern = expect_str(args.first())?.to_string();
468                // Glob patterns can't be path-scoped at parse time
469                // (`**/*.rs` doesn't pin a directory); we filter the
470                // per-result paths after expansion against
471                // `--allow-fs-read`.
472                let entries = match glob::glob(&pattern) {
473                    Ok(e) => e,
474                    Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
475                };
476                let mut paths: Vec<Value> = Vec::new();
477                for ent in entries {
478                    match ent {
479                        Ok(p) => {
480                            let s = p.to_string_lossy().into_owned();
481                            if self.policy.allow_fs_read.is_empty()
482                                || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
483                            {
484                                paths.push(Value::Str(s));
485                            }
486                        }
487                        Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
488                    }
489                }
490                Ok(ok(Value::List(paths)))
491            }
492            "mkdir_p" => {
493                let path = expect_str(args.first())?.to_string();
494                if let Err(e) = self.ensure_fs_write_path(&path) {
495                    return Ok(err(Value::Str(e)));
496                }
497                match std::fs::create_dir_all(&path) {
498                    Ok(_) => Ok(ok(Value::Unit)),
499                    Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
500                }
501            }
502            "remove" => {
503                let path = expect_str(args.first())?.to_string();
504                if let Err(e) = self.ensure_fs_write_path(&path) {
505                    return Ok(err(Value::Str(e)));
506                }
507                let p = std::path::Path::new(&path);
508                let result = if p.is_dir() {
509                    std::fs::remove_dir_all(p)
510                } else {
511                    std::fs::remove_file(p)
512                };
513                match result {
514                    Ok(_) => Ok(ok(Value::Unit)),
515                    Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
516                }
517            }
518            "copy" => {
519                let src = expect_str(args.first())?.to_string();
520                let dst = expect_str(args.get(1))?.to_string();
521                if let Err(e) = self.ensure_fs_walk_path(&src) {
522                    return Ok(err(Value::Str(e)));
523                }
524                if let Err(e) = self.ensure_fs_write_path(&dst) {
525                    return Ok(err(Value::Str(e)));
526                }
527                match std::fs::copy(&src, &dst) {
528                    Ok(_) => Ok(ok(Value::Unit)),
529                    Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
530                }
531            }
532            other => Err(format!("unsupported fs.{other}")),
533        }
534    }
535
536    /// Path scope for walk-style operations. `[fs_walk]` reuses the
537    /// `--allow-fs-read` allowlist — listing a directory is an
538    /// information disclosure on the same path tree as reading file
539    /// content, so the same scope applies. Empty allowlist = any path.
540    fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
541        if self.policy.allow_fs_read.is_empty() {
542            return Ok(());
543        }
544        let p = std::path::Path::new(path);
545        if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
546            Ok(())
547        } else {
548            Err(format!("fs path `{path}` outside --allow-fs-read"))
549        }
550    }
551
552    /// Path scope for mutating operations. `[fs_write]` uses the
553    /// existing `--allow-fs-write` allowlist.
554    fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
555        if self.policy.allow_fs_write.is_empty() {
556            return Ok(());
557        }
558        let p = std::path::Path::new(path);
559        if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
560            Ok(())
561        } else {
562            Err(format!("fs path `{path}` outside --allow-fs-write"))
563        }
564    }
565
566    /// Enforce `--allow-net-host` against an outgoing URL. Empty
567    /// allowlist = any host. Non-empty = the URL's host must match
568    /// (substring; port-agnostic) at least one entry.
569    fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
570        if self.policy.allow_net_host.is_empty() { return Ok(()); }
571        let host = extract_host(url).unwrap_or("");
572        if self.policy.allow_net_host.iter().any(|h| host == h) {
573            Ok(())
574        } else {
575            Err(format!(
576                "net call to host `{host}` not in --allow-net-host {:?}",
577                self.policy.allow_net_host,
578            ))
579        }
580    }
581}
582
583fn extract_host(url: &str) -> Option<&str> {
584    let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
585    let host_port = match rest.find('/') {
586        Some(i) => &rest[..i],
587        None => rest,
588    };
589    Some(match host_port.rsplit_once(':') {
590        Some((h, _)) => h,
591        None => host_port,
592    })
593}
594
595impl EffectHandler for DefaultHandler {
596    /// Per-call budget enforcement (#225). VM calls this before
597    /// invoking any function whose signature declares `[budget(N)]`.
598    /// The cost N is deducted atomically from the shared pool;
599    /// returning `Err` aborts the call before any frame is pushed.
600    fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
601        // Skip the work entirely when no ceiling is configured —
602        // the pool is `u64::MAX` and would never trip.
603        let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
604        // Compare-and-swap: speculatively subtract; if we'd
605        // underflow, return BudgetExceeded without mutating.
606        // Use SeqCst because parallel branches may race here and
607        // the relative ordering of "used so far" vs. "this call's
608        // cost" needs to be deterministic across threads.
609        loop {
610            let cur = self.budget_remaining.load(Ordering::SeqCst);
611            if cost > cur {
612                let used = ceiling.saturating_sub(cur);
613                return Err(format!(
614                    "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
615            }
616            let next = cur - cost;
617            // Conservative accounting: if the CAS races and loses,
618            // re-read and try again. No refund-on-failure path.
619            if self.budget_remaining.compare_exchange(cur, next,
620                Ordering::SeqCst, Ordering::SeqCst).is_ok() {
621                return Ok(());
622            }
623        }
624    }
625
626    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
627        // Pure stdlib builtins (str, list, json, ...) bypass the policy
628        // gate — they have no observable side effects and aren't tracked
629        // by the type system as effects.
630        if let Some(r) = try_pure_builtin(kind, op, &args) {
631            return r;
632        }
633        // `std.fs` ops use the fine-grained `[fs_walk]` and `[fs_write]`
634        // effect kinds (distinct from the module name `fs`); the
635        // policy check uses the per-op kind, not the module's.
636        if kind == "process" {
637            self.ensure_kind_allowed("proc")?;
638            return self.dispatch_process(op, args);
639        }
640        if kind == "log" {
641            // Emit ops are [log]; config ops are [io] (set_sink also
642            // [fs_write]). The dispatch picks the right kind per op.
643            let effect_kind = match op {
644                "debug" | "info" | "warn" | "error" => "log",
645                "set_level" | "set_format" => "io",
646                "set_sink" => {
647                    self.ensure_kind_allowed("io")?;
648                    self.ensure_kind_allowed("fs_write")?;
649                    return self.dispatch_log(op, args);
650                }
651                other => return Err(format!("unsupported log.{other}")),
652            };
653            self.ensure_kind_allowed(effect_kind)?;
654            return self.dispatch_log(op, args);
655        }
656        if kind == "fs" {
657            let effect_kind = match op {
658                "exists" | "is_file" | "is_dir" | "stat"
659                | "list_dir" | "walk" | "glob" => "fs_walk",
660                "mkdir_p" | "remove" => "fs_write",
661                "copy" => {
662                    self.ensure_kind_allowed("fs_walk")?;
663                    self.ensure_kind_allowed("fs_write")?;
664                    return self.dispatch_fs(op, args);
665                }
666                other => return Err(format!("unsupported fs.{other}")),
667            };
668            self.ensure_kind_allowed(effect_kind)?;
669            return self.dispatch_fs(op, args);
670        }
671        // `crypto.random` is the lone effectful op in `std.crypto`. Its
672        // declared effect kind is `random` (fine-grained on purpose so
673        // `lex audit --effect random` flags every token-generating
674        // call), distinct from the `crypto` module name.
675        // datetime.now is the only effectful op in std.datetime;
676        // declared kind is `time`, matching the existing `time.now`.
677        if kind == "datetime" && op == "now" {
678            self.ensure_kind_allowed("time")?;
679            let now = chrono::Utc::now();
680            let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
681            return Ok(Value::Int(nanos));
682        }
683        if kind == "crypto" && op == "random" {
684            self.ensure_kind_allowed("random")?;
685            let n = expect_int(args.first())?;
686            if !(0..=1_048_576).contains(&n) {
687                return Err("crypto.random: n must be in 0..=1048576".into());
688            }
689            use rand::{rngs::SysRng, TryRng};
690            let mut buf = vec![0u8; n as usize];
691            SysRng.try_fill_bytes(&mut buf)
692                .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
693            return Ok(Value::Bytes(buf));
694        }
695        // `std.http` wire ops (send/get/post) gate on the `net`
696        // effect kind, not the module name. This matches the
697        // declared signature (`http.get :: Str -> [net] ...`) and
698        // keeps `--allow-effects net` doing the obvious thing for
699        // both `net.*` and `http.*` callers.
700        // `std.agent` (#184): the four runtime effects added for
701        // agent-style programs (`llm_local`, `llm_cloud`, `a2a`,
702        // `mcp`). The handlers are stubs — they enforce the
703        // declared-effect gate, return a sentinel `Ok` so traces
704        // record the call, and defer the real wire formats to
705        // downstream crates (`soft-agent` for `llm_*` and `a2a`)
706        // and #185 (MCP client wrapper).
707        if kind == "agent" {
708            let effect_kind = match op {
709                "local_complete" => "llm_local",
710                "cloud_complete" => "llm_cloud",
711                "send_a2a"       => "a2a",
712                "call_mcp"       => "mcp",
713                other => return Err(format!("unsupported agent.{other}")),
714            };
715            self.ensure_kind_allowed(effect_kind)?;
716            // `call_mcp` runs through the LRU client cache
717            // (#197). `local_complete` / `cloud_complete` hit
718            // Ollama / OpenAI via env-var-driven configuration
719            // (#196); custom backends override at the
720            // EffectHandler layer rather than via a config file.
721            // `send_a2a` keeps its stub — that wire format
722            // lives in downstream `soft-a2a`.
723            return match op {
724                "call_mcp"       => Ok(self.dispatch_call_mcp(args)),
725                "local_complete" => Ok(dispatch_llm_local(args)),
726                "cloud_complete" => Ok(dispatch_llm_cloud(args)),
727                _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
728            };
729        }
730        if kind == "http" && matches!(op, "send" | "get" | "post") {
731            self.ensure_kind_allowed("net")?;
732            return match op {
733                "send" => {
734                    let req = expect_record(args.first())?;
735                    Ok(http_send_record(self, req))
736                }
737                "get" => {
738                    let url = expect_str(args.first())?.to_string();
739                    self.ensure_host_allowed(&url)?;
740                    Ok(http_send_simple("GET", &url, None, "", None))
741                }
742                "post" => {
743                    let url = expect_str(args.first())?.to_string();
744                    let body = expect_bytes(args.get(1))?.clone();
745                    let content_type = expect_str(args.get(2))?.to_string();
746                    self.ensure_host_allowed(&url)?;
747                    Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
748                }
749                _ => unreachable!(),
750            };
751        }
752        self.ensure_kind_allowed(kind)?;
753        match (kind, op) {
754            ("io", "print") => {
755                let line = expect_str(args.first())?;
756                self.sink.print_line(line);
757                Ok(Value::Unit)
758            }
759            ("io", "read") => {
760                let path = expect_str(args.first())?.to_string();
761                let resolved = self.resolve_read_path(&path);
762                // Honor read-allowlist if any. Symmetric with io.write.
763                // The path argument is checked as-given (resolved-against-
764                // read_root for tests); a tool granted [io] cannot escape
765                // the configured prefix even though the effect itself is
766                // permitted. This is the per-path scope the bench's case
767                // #6 ("[io] granted, body reads /etc/passwd") needed.
768                if !self.policy.allow_fs_read.is_empty()
769                    && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
770                {
771                    return Err(format!("read of `{path}` outside --allow-fs-read"));
772                }
773                match std::fs::read_to_string(&resolved) {
774                    Ok(s) => Ok(ok(Value::Str(s))),
775                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
776                }
777            }
778            ("io", "write") => {
779                let path = expect_str(args.first())?.to_string();
780                let contents = expect_str(args.get(1))?.to_string();
781                // Honor write-allowlist if any.
782                if !self.policy.allow_fs_write.is_empty() {
783                    let p = std::path::Path::new(&path);
784                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
785                        return Err(format!("write to `{path}` outside --allow-fs-write"));
786                    }
787                }
788                match std::fs::write(&path, contents) {
789                    Ok(_) => Ok(ok(Value::Unit)),
790                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
791                }
792            }
793            ("time", "now") => {
794                let secs = SystemTime::now().duration_since(UNIX_EPOCH)
795                    .map_err(|e| format!("time: {e}"))?.as_secs();
796                Ok(Value::Int(secs as i64))
797            }
798            ("time", "sleep_ms") => {
799                // Block the current thread for `n` ms (#226). Used
800                // by `flow.retry_with_backoff`'s exponential delay.
801                // Negative or zero is a no-op. Bounded at 60s in the
802                // runtime to avoid pathological agent-emitted loops
803                // wedging the host — anything legitimate beyond
804                // that should use process-level scheduling, not a
805                // blocking sleep.
806                let n = expect_int(args.first())?;
807                if n > 0 {
808                    let ms = (n as u64).min(60_000);
809                    std::thread::sleep(std::time::Duration::from_millis(ms));
810                }
811                Ok(Value::Unit)
812            }
813            ("rand", "int_in") => {
814                // Deterministic stub: midpoint of [lo, hi].
815                let lo = expect_int(args.first())?;
816                let hi = expect_int(args.get(1))?;
817                Ok(Value::Int((lo + hi) / 2))
818            }
819            // `env.get` returns `Option[Str]` — `None` for unset vars.
820            // Per-var scoping (`[env(NAME)]`) arrives with #207's
821            // per-capability effect parameterization; today the flat
822            // `[env]` grants access to the entire process environment.
823            ("env", "get") => {
824                let name = expect_str(args.first())?;
825                Ok(match std::env::var(name) {
826                    Ok(v) => Value::Variant {
827                        name: "Some".into(),
828                        args: vec![Value::Str(v)],
829                    },
830                    Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
831                })
832            }
833            ("budget", _) => {
834                // Budget calls are nominally tracked here; budget itself is
835                // enforced statically in `policy::check_program`.
836                Ok(Value::Unit)
837            }
838            ("net", "get") => {
839                let url = expect_str(args.first())?.to_string();
840                self.ensure_host_allowed(&url)?;
841                Ok(http_request("GET", &url, None))
842            }
843            ("net", "post") => {
844                let url = expect_str(args.first())?.to_string();
845                let body = expect_str(args.get(1))?.to_string();
846                self.ensure_host_allowed(&url)?;
847                Ok(http_request("POST", &url, Some(&body)))
848            }
849            ("net", "serve") => {
850                let port = match args.first() {
851                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
852                    _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
853                };
854                let handler_name = expect_str(args.get(1))?.to_string();
855                let program = self.program.clone()
856                    .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
857                let policy = self.policy.clone();
858                serve_http(port, handler_name, program, policy, None)
859            }
860            ("net", "serve_tls") => {
861                let port = match args.first() {
862                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
863                    _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
864                };
865                let cert_path = expect_str(args.get(1))?.to_string();
866                let key_path = expect_str(args.get(2))?.to_string();
867                let handler_name = expect_str(args.get(3))?.to_string();
868                let program = self.program.clone()
869                    .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
870                let policy = self.policy.clone();
871                let cert = std::fs::read(&cert_path)
872                    .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
873                let key = std::fs::read(&key_path)
874                    .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
875                serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
876            }
877            ("net", "serve_ws") => {
878                let port = match args.first() {
879                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
880                    _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
881                };
882                let handler_name = expect_str(args.get(1))?.to_string();
883                let program = self.program.clone()
884                    .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
885                let policy = self.policy.clone();
886                let registry = Arc::new(crate::ws::ChatRegistry::default());
887                crate::ws::serve_ws(port, handler_name, program, policy, registry)
888            }
889            ("chat", "broadcast") => {
890                let registry = self.chat_registry.as_ref()
891                    .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
892                let room = expect_str(args.first())?;
893                let body = expect_str(args.get(1))?;
894                crate::ws::chat_broadcast(registry, room, body);
895                Ok(Value::Unit)
896            }
897            ("chat", "send") => {
898                let registry = self.chat_registry.as_ref()
899                    .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
900                let conn_id = match args.first() {
901                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
902                    _ => return Err("chat.send: conn_id must be non-negative Int".into()),
903                };
904                let body = expect_str(args.get(1))?;
905                Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
906            }
907            ("kv", "open") => {
908                let path = expect_str(args.first())?.to_string();
909                // Honor write-allowlist: opening a Kv writes its
910                // backing files at `path`, so the same scoping that
911                // applies to `io.write` applies here.
912                if !self.policy.allow_fs_write.is_empty() {
913                    let p = std::path::Path::new(&path);
914                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
915                        return Ok(err(Value::Str(format!(
916                            "kv.open: `{path}` outside --allow-fs-write"))));
917                    }
918                }
919                match sled::open(&path) {
920                    Ok(db) => {
921                        let handle = next_kv_handle();
922                        kv_registry().lock().unwrap().insert(handle, db);
923                        Ok(ok(Value::Int(handle as i64)))
924                    }
925                    Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
926                }
927            }
928            ("kv", "close") => {
929                let h = expect_kv_handle(args.first())?;
930                kv_registry().lock().unwrap().remove(h);
931                Ok(Value::Unit)
932            }
933            ("kv", "get") => {
934                let h = expect_kv_handle(args.first())?;
935                let key = expect_str(args.get(1))?;
936                let mut reg = kv_registry().lock().unwrap();
937                let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
938                match db.get(key.as_bytes()) {
939                    Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
940                    Ok(None) => Ok(none()),
941                    Err(e) => Err(format!("kv.get: {e}")),
942                }
943            }
944            ("kv", "put") => {
945                let h = expect_kv_handle(args.first())?;
946                let key = expect_str(args.get(1))?.to_string();
947                let val = expect_bytes(args.get(2))?.clone();
948                let mut reg = kv_registry().lock().unwrap();
949                let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
950                match db.insert(key.as_bytes(), val) {
951                    Ok(_) => Ok(ok(Value::Unit)),
952                    Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
953                }
954            }
955            ("kv", "delete") => {
956                let h = expect_kv_handle(args.first())?;
957                let key = expect_str(args.get(1))?;
958                let mut reg = kv_registry().lock().unwrap();
959                let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
960                match db.remove(key.as_bytes()) {
961                    Ok(_) => Ok(ok(Value::Unit)),
962                    Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
963                }
964            }
965            ("kv", "contains") => {
966                let h = expect_kv_handle(args.first())?;
967                let key = expect_str(args.get(1))?;
968                let mut reg = kv_registry().lock().unwrap();
969                let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
970                match db.contains_key(key.as_bytes()) {
971                    Ok(present) => Ok(Value::Bool(present)),
972                    Err(e) => Err(format!("kv.contains: {e}")),
973                }
974            }
975            ("kv", "list_prefix") => {
976                let h = expect_kv_handle(args.first())?;
977                let prefix = expect_str(args.get(1))?;
978                let mut reg = kv_registry().lock().unwrap();
979                let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
980                let mut keys: Vec<Value> = Vec::new();
981                for kv in db.scan_prefix(prefix.as_bytes()) {
982                    let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
983                    let s = String::from_utf8_lossy(&k).to_string();
984                    keys.push(Value::Str(s));
985                }
986                Ok(Value::List(keys))
987            }
988            ("sql", "open") => {
989                let path = expect_str(args.first())?.to_string();
990                // Same shape as `kv.open`: opening creates the
991                // SQLite file, so the fs-write allowlist applies
992                // (in-memory paths are exempt).
993                if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
994                    let p = std::path::Path::new(&path);
995                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
996                        return Ok(err(Value::Str(format!(
997                            "sql.open: `{path}` outside --allow-fs-write"))));
998                    }
999                }
1000                match rusqlite::Connection::open(&path) {
1001                    Ok(conn) => {
1002                        let handle = next_sql_handle();
1003                        sql_registry().lock().unwrap().insert(handle, conn);
1004                        Ok(ok(Value::Int(handle as i64)))
1005                    }
1006                    Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1007                }
1008            }
1009            ("sql", "close") => {
1010                let h = expect_sql_handle(args.first())?;
1011                sql_registry().lock().unwrap().remove(h);
1012                Ok(Value::Unit)
1013            }
1014            ("sql", "exec") => {
1015                let h = expect_sql_handle(args.first())?;
1016                let stmt = expect_str(args.get(1))?.to_string();
1017                let params = expect_str_list(args.get(2))?;
1018                let arc = sql_registry().lock().unwrap()
1019                    .touch_get(h)
1020                    .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1021                let conn = arc.lock().unwrap();
1022                let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1023                    .map(|s| s as &dyn rusqlite::ToSql)
1024                    .collect();
1025                match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1026                    Ok(n)  => Ok(ok(Value::Int(n as i64))),
1027                    Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1028                }
1029            }
1030            ("sql", "query") => {
1031                let h = expect_sql_handle(args.first())?;
1032                let stmt_str = expect_str(args.get(1))?.to_string();
1033                let params = expect_str_list(args.get(2))?;
1034                let arc = sql_registry().lock().unwrap()
1035                    .touch_get(h)
1036                    .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1037                let conn = arc.lock().unwrap();
1038                Ok(sql_run_query(&conn, &stmt_str, &params))
1039            }
1040            ("proc", "spawn") => {
1041                // The escape hatch effect. Spawns a child process,
1042                // collects its stdout/stderr, returns a structured
1043                // record. Allow-list is the binary basename: anything
1044                // outside `--allow-proc` is rejected pre-spawn.
1045                //
1046                // What this does NOT validate (per SECURITY.md):
1047                // - per-arg content (a script-like CLI invoked via
1048                //   --eval=... can run anything)
1049                // - environment variables (inherited from the parent)
1050                // - working directory (the parent's)
1051                //
1052                // For untrusted input, layer with OS-level
1053                // sandboxing — gVisor / nsjail / a container.
1054                let cmd = expect_str(args.first())?.to_string();
1055                let raw_args = match args.get(1) {
1056                    Some(Value::List(items)) => items,
1057                    Some(other) => return Err(format!(
1058                        "proc.spawn: args must be List[Str], got {other:?}")),
1059                    None => return Err("proc.spawn: missing args list".into()),
1060                };
1061                let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1062                    Value::Str(s) => Ok(s.clone()),
1063                    other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1064                }).collect::<Result<Vec<_>, _>>()?;
1065
1066                // Allow-list check: empty list = any binary (escape
1067                // hatch); non-empty = basename of cmd must match an
1068                // entry exactly.
1069                if !self.policy.allow_proc.is_empty() {
1070                    let basename = std::path::Path::new(&cmd)
1071                        .file_name()
1072                        .and_then(|s| s.to_str())
1073                        .unwrap_or(&cmd);
1074                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
1075                        return Ok(err(Value::Str(format!(
1076                            "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1077                            self.policy.allow_proc
1078                        ))));
1079                    }
1080                }
1081
1082                // Hard caps: the spec doesn't pin numbers, but
1083                // unbounded argv is a DoS vector.
1084                if str_args.len() > 1024 {
1085                    return Ok(err(Value::Str(
1086                        "proc.spawn: arg-count exceeds 1024".into())));
1087                }
1088                if str_args.iter().any(|a| a.len() > 65_536) {
1089                    return Ok(err(Value::Str(
1090                        "proc.spawn: per-arg length exceeds 64 KiB".into())));
1091                }
1092
1093                let output = std::process::Command::new(&cmd)
1094                    .args(&str_args)
1095                    .output();
1096                match output {
1097                    Ok(o) => {
1098                        let mut rec = indexmap::IndexMap::new();
1099                        rec.insert("stdout".into(), Value::Str(
1100                            String::from_utf8_lossy(&o.stdout).to_string()));
1101                        rec.insert("stderr".into(), Value::Str(
1102                            String::from_utf8_lossy(&o.stderr).to_string()));
1103                        rec.insert("exit_code".into(), Value::Int(
1104                            o.status.code().unwrap_or(-1) as i64));
1105                        Ok(ok(Value::Record(rec)))
1106                    }
1107                    Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1108                }
1109            }
1110            other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1111        }
1112    }
1113}
1114
1115/// Blocks the calling thread, accepts incoming HTTP requests on
1116/// `127.0.0.1:port`, and dispatches each through the named Lex
1117/// stage. Each request gets a fresh `Vm`; the program and policy
1118/// are shared.
1119///
1120/// Handler signature in Lex (by convention):
1121///   fn <name>(req :: Record { method :: Str, path :: Str, body :: Str })
1122///        -> Record { status :: Int, body :: Str }
1123/// PEM-encoded certificate + private key, both as raw bytes.
1124pub struct TlsConfig {
1125    pub cert: Vec<u8>,
1126    pub key: Vec<u8>,
1127}
1128
1129fn serve_http(
1130    port: u16,
1131    handler_name: String,
1132    program: Arc<Program>,
1133    policy: Policy,
1134    tls: Option<TlsConfig>,
1135) -> Result<Value, String> {
1136    let (server, scheme) = match tls {
1137        None => (
1138            tiny_http::Server::http(("127.0.0.1", port))
1139                .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1140            "http",
1141        ),
1142        Some(cfg) => {
1143            let ssl = tiny_http::SslConfig {
1144                certificate: cfg.cert,
1145                private_key: cfg.key,
1146            };
1147            (
1148                tiny_http::Server::https(("127.0.0.1", port), ssl)
1149                    .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1150                "https",
1151            )
1152        }
1153    };
1154    eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1155    // Thread-per-request: the main loop accepts; each request runs on
1156    // its own worker thread with its own fresh Vm. The Program is
1157    // shared via Arc; Policy and handler_name are cloned per request.
1158    // Lex's immutability means there's no shared mutable state at the
1159    // language level — workers don't race.
1160    for req in server.incoming_requests() {
1161        let program = Arc::clone(&program);
1162        let policy = policy.clone();
1163        let handler_name = handler_name.clone();
1164        std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1165    }
1166    Ok(Value::Unit)
1167}
1168
1169fn handle_request(
1170    mut req: tiny_http::Request,
1171    program: Arc<Program>,
1172    policy: Policy,
1173    handler_name: String,
1174) {
1175    let lex_req = build_request_value(&mut req);
1176    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1177    let mut vm = Vm::with_handler(&program, Box::new(handler));
1178    match vm.call(&handler_name, vec![lex_req]) {
1179        Ok(resp) => {
1180            let (status, body) = unpack_response(&resp);
1181            let response = tiny_http::Response::from_string(body).with_status_code(status);
1182            let _ = req.respond(response);
1183        }
1184        Err(e) => {
1185            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1186                .with_status_code(500);
1187            let _ = req.respond(response);
1188        }
1189    }
1190}
1191
1192fn build_request_value(req: &mut tiny_http::Request) -> Value {
1193    let method = format!("{:?}", req.method()).to_uppercase();
1194    let url = req.url().to_string();
1195    let (path, query) = match url.split_once('?') {
1196        Some((p, q)) => (p.to_string(), q.to_string()),
1197        None => (url, String::new()),
1198    };
1199    let mut body = String::new();
1200    let _ = req.as_reader().read_to_string(&mut body);
1201    let mut rec = indexmap::IndexMap::new();
1202    rec.insert("method".into(), Value::Str(method));
1203    rec.insert("path".into(), Value::Str(path));
1204    rec.insert("query".into(), Value::Str(query));
1205    rec.insert("body".into(), Value::Str(body));
1206    Value::Record(rec)
1207}
1208
1209fn unpack_response(v: &Value) -> (u16, String) {
1210    if let Value::Record(rec) = v {
1211        let status = rec.get("status").and_then(|s| match s {
1212            Value::Int(n) => Some(*n as u16),
1213            _ => None,
1214        }).unwrap_or(200);
1215        let body = rec.get("body").and_then(|b| match b {
1216            Value::Str(s) => Some(s.clone()),
1217            _ => None,
1218        }).unwrap_or_default();
1219        return (status, body);
1220    }
1221    (500, format!("handler returned non-record: {v:?}"))
1222}
1223
1224/// HTTP/1.1 client backed by `ureq` + `rustls`. Accepts both
1225/// `http://` and `https://` URLs. Returns `Result[Str, Str]` as a
1226/// Lex `Value::Variant`. The earlier hand-rolled HTTP/1.0 client
1227/// was plain-TCP only — most public APIs are HTTPS, so the demo
1228/// could fetch `example.com` but not `wttr.in` or `api.github.com`.
1229fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1230    use std::time::Duration;
1231    // ureq 3 puts 4xx/5xx behind `Error::StatusCode(code)` and consumes
1232    // the response, so the body would be lost. Disabling
1233    // `http_status_as_error` lets us check the status manually and
1234    // surface `Err("status 404: <body>")` like the old code did.
1235    let agent: ureq::Agent = ureq::Agent::config_builder()
1236        .timeout_connect(Some(Duration::from_secs(10)))
1237        .timeout_recv_body(Some(Duration::from_secs(30)))
1238        .timeout_send_body(Some(Duration::from_secs(10)))
1239        .http_status_as_error(false)
1240        .build()
1241        .into();
1242    let resp = match (method, body) {
1243        ("GET", _) => agent.get(url).call(),
1244        ("POST", Some(b)) => agent.post(url).send(b),
1245        ("POST", None) => agent.post(url).send(""),
1246        (m, _) => return err_value(format!("unsupported method: {m}")),
1247    };
1248    match resp {
1249        Ok(mut r) => {
1250            let status = r.status().as_u16();
1251            let body = r.body_mut().read_to_string().unwrap_or_default();
1252            if (200..300).contains(&status) {
1253                Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1254            } else {
1255                err_value(format!("status {status}: {body}"))
1256            }
1257        }
1258        Err(e) => err_value(format!("transport: {e}")),
1259    }
1260}
1261
1262/// Build a ureq agent for `std.http.{send,get,post}` with the given
1263/// timeout (None → use the same defaults as the legacy `net.{get,post}`
1264/// path). Separate from `http_request` so the rich `http.send` flow
1265/// can supply per-request overrides.
1266fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1267    use std::time::Duration;
1268    let mut b = ureq::Agent::config_builder()
1269        .timeout_connect(Some(Duration::from_secs(10)))
1270        .timeout_recv_body(Some(Duration::from_secs(30)))
1271        .timeout_send_body(Some(Duration::from_secs(10)))
1272        .http_status_as_error(false);
1273    if let Some(ms) = timeout_ms {
1274        let d = Duration::from_millis(ms);
1275        b = b.timeout_global(Some(d));
1276    }
1277    b.build().into()
1278}
1279
1280/// Map ureq's transport error to the structured `HttpError` variant
1281/// std.http exposes to user code. Anything not specifically a
1282/// timeout / TLS error funnels into `NetworkError`.
1283fn http_error_value(e: ureq::Error) -> Value {
1284    let (ctor, payload): (&str, Option<String>) = match &e {
1285        ureq::Error::Timeout(_) => ("TimeoutError", None),
1286        ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1287        ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1288        ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1289        _ => ("NetworkError", Some(format!("{e}"))),
1290    };
1291    let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1292    let inner = Value::Variant { name: ctor.into(), args };
1293    Value::Variant { name: "Err".into(), args: vec![inner] }
1294}
1295
1296fn http_decode_err(msg: String) -> Value {
1297    let inner = Value::Variant {
1298        name: "DecodeError".into(),
1299        args: vec![Value::Str(msg)],
1300    };
1301    Value::Variant { name: "Err".into(), args: vec![inner] }
1302}
1303
1304/// Run a request and pack the ureq response into the
1305/// `{ status, headers, body }` Lex record (or the structured
1306/// `HttpError` on failure). `headers_extra` pairs are appended to the
1307/// outgoing request after `content_type` is applied.
1308fn http_send_simple(
1309    method: &str,
1310    url: &str,
1311    body: Option<Vec<u8>>,
1312    content_type: &str,
1313    timeout_ms: Option<u64>,
1314) -> Value {
1315    http_send_full(method, url, body, content_type, &[], timeout_ms)
1316}
1317
1318fn http_send_full(
1319    method: &str,
1320    url: &str,
1321    body: Option<Vec<u8>>,
1322    content_type: &str,
1323    headers: &[(String, String)],
1324    timeout_ms: Option<u64>,
1325) -> Value {
1326    let agent = http_agent(timeout_ms);
1327    let resp = match method {
1328        "GET" => {
1329            let mut req = agent.get(url);
1330            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1331            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1332            req.call()
1333        }
1334        "POST" => {
1335            let body = body.unwrap_or_default();
1336            let mut req = agent.post(url);
1337            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1338            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1339            req.send(&body[..])
1340        }
1341        m => {
1342            // Other methods (PUT, DELETE, PATCH, ...) fall through
1343            // here in v1.5; for now surface a structured DecodeError
1344            // so the caller can match it.
1345            return http_decode_err(format!("unsupported method: {m}"));
1346        }
1347    };
1348    match resp {
1349        Ok(mut r) => {
1350            let status = r.status().as_u16() as i64;
1351            let headers_map = collect_response_headers(r.headers());
1352            let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1353                Ok(b) => b,
1354                Err(e) => return http_decode_err(format!("body read: {e}")),
1355            };
1356            let mut rec = indexmap::IndexMap::new();
1357            rec.insert("status".into(), Value::Int(status));
1358            rec.insert("headers".into(), Value::Map(headers_map));
1359            rec.insert("body".into(), Value::Bytes(body_bytes));
1360            Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1361        }
1362        Err(e) => http_error_value(e),
1363    }
1364}
1365
1366fn collect_response_headers(
1367    headers: &ureq::http::HeaderMap,
1368) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1369    let mut out = std::collections::BTreeMap::new();
1370    for (name, value) in headers.iter() {
1371        let v = value.to_str().unwrap_or("").to_string();
1372        out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1373    }
1374    out
1375}
1376
1377/// Pull the standard `HttpRequest` shape out of a `Value::Record`
1378/// and dispatch through `http_send_full`. The handler verifies
1379/// `--allow-net-host` for the URL before sending.
1380fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1381    let method = match req.get("method") {
1382        Some(Value::Str(s)) => s.clone(),
1383        _ => return http_decode_err("HttpRequest.method must be Str".into()),
1384    };
1385    let url = match req.get("url") {
1386        Some(Value::Str(s)) => s.clone(),
1387        _ => return http_decode_err("HttpRequest.url must be Str".into()),
1388    };
1389    if let Err(e) = handler.ensure_host_allowed(&url) {
1390        return http_decode_err(e);
1391    }
1392    let body = match req.get("body") {
1393        Some(Value::Variant { name, args }) if name == "None" => None,
1394        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1395            [Value::Bytes(b)] => Some(b.clone()),
1396            _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1397        },
1398        _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1399    };
1400    let timeout_ms = match req.get("timeout_ms") {
1401        Some(Value::Variant { name, .. }) if name == "None" => None,
1402        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1403            [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1404            _ => return http_decode_err(
1405                "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1406        },
1407        _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1408    };
1409    let headers: Vec<(String, String)> = match req.get("headers") {
1410        Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1411            let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1412            let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1413            Some((kk, vv))
1414        }).collect(),
1415        _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1416    };
1417    http_send_full(&method, &url, body, "", &headers, timeout_ms)
1418}
1419
1420fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1421    match v {
1422        Some(Value::Record(r)) => Ok(r),
1423        Some(other) => Err(format!("expected Record, got {other:?}")),
1424        None => Err("missing Record argument".into()),
1425    }
1426}
1427
1428fn err_value(msg: String) -> Value {
1429    Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1430}
1431
1432fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1433    match v {
1434        Some(Value::Str(s)) => Ok(s),
1435        Some(other) => Err(format!("expected Str arg, got {other:?}")),
1436        None => Err("missing argument".into()),
1437    }
1438}
1439
1440fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1441    match v {
1442        Some(Value::Int(n)) => Ok(*n),
1443        Some(other) => Err(format!("expected Int arg, got {other:?}")),
1444        None => Err("missing argument".into()),
1445    }
1446}
1447
1448fn ok(v: Value) -> Value {
1449    Value::Variant { name: "Ok".into(), args: vec![v] }
1450}
1451fn err(v: Value) -> Value {
1452    Value::Variant { name: "Err".into(), args: vec![v] }
1453}
1454
1455impl DefaultHandler {
1456    /// Implementation of `agent.call_mcp(server, tool, args_json)`.
1457    /// Goes through the LRU client cache (#197): the named server
1458    /// is spawned on first use and reused on subsequent calls.
1459    /// On failure the offending client is dropped so the next
1460    /// call respawns rather than silently failing forever.
1461    fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1462        let server = match args.first() {
1463            Some(Value::Str(s)) => s.clone(),
1464            _ => return err(Value::Str(
1465                "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1466        };
1467        let tool = match args.get(1) {
1468            Some(Value::Str(s)) => s.clone(),
1469            _ => return err(Value::Str(
1470                "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1471        };
1472        let args_json = match args.get(2) {
1473            Some(Value::Str(s)) => s.clone(),
1474            _ => return err(Value::Str(
1475                "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1476        };
1477        let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1478            Ok(v) => v,
1479            Err(e) => return err(Value::Str(format!(
1480                "agent.call_mcp: args_json is not valid JSON: {e}"))),
1481        };
1482        match self.mcp_clients.call(&server, &tool, parsed) {
1483            Ok(result) => ok(Value::Str(
1484                serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1485            Err(e) => err(Value::Str(e)),
1486        }
1487    }
1488}
1489
1490/// Implementation of `agent.local_complete(prompt)` (#196).
1491/// Hits Ollama (or any compatible HTTP service via `OLLAMA_HOST`)
1492/// and returns the completion text. Override at the
1493/// `EffectHandler` layer if you need a different transport.
1494fn dispatch_llm_local(args: Vec<Value>) -> Value {
1495    let prompt = match args.first() {
1496        Some(Value::Str(s)) => s.clone(),
1497        _ => return err(Value::Str(
1498            "agent.local_complete(prompt): prompt must be Str".into())),
1499    };
1500    match crate::llm::local_complete(&prompt) {
1501        Ok(text) => ok(Value::Str(text)),
1502        Err(e) => err(Value::Str(e)),
1503    }
1504}
1505
1506/// Implementation of `agent.cloud_complete(prompt)` (#196).
1507/// Hits OpenAI's chat-completions API (or any compatible
1508/// service via `OPENAI_BASE_URL`) and returns the assistant
1509/// message. Requires `OPENAI_API_KEY`. Override at the
1510/// `EffectHandler` layer for custom auth, batching, or other
1511/// providers.
1512fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1513    let prompt = match args.first() {
1514        Some(Value::Str(s)) => s.clone(),
1515        _ => return err(Value::Str(
1516            "agent.cloud_complete(prompt): prompt must be Str".into())),
1517    };
1518    match crate::llm::cloud_complete(&prompt) {
1519        Ok(text) => ok(Value::Str(text)),
1520        Err(e) => err(Value::Str(e)),
1521    }
1522}
1523
1524fn some(v: Value) -> Value {
1525    Value::Variant { name: "Some".into(), args: vec![v] }
1526}
1527fn none() -> Value {
1528    Value::Variant { name: "None".into(), args: vec![] }
1529}
1530
1531fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1532    match v {
1533        Some(Value::Bytes(b)) => Ok(b),
1534        Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1535        None => Err("missing argument".into()),
1536    }
1537}
1538
1539fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1540    match v {
1541        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1542        Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1543        None => Err("missing Kv argument".into()),
1544    }
1545}
1546
1547fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1548    match v {
1549        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1550        Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1551        None => Err("missing Db argument".into()),
1552    }
1553}
1554
1555fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1556    match v {
1557        Some(Value::List(items)) => items.iter().map(|x| match x {
1558            Value::Str(s) => Ok(s.clone()),
1559            other => Err(format!("expected List[Str] element, got {other:?}")),
1560        }).collect(),
1561        Some(other) => Err(format!("expected List[Str], got {other:?}")),
1562        None => Err("missing List[Str] argument".into()),
1563    }
1564}
1565
1566/// Run a `SELECT` (or any returning statement) and pack the rows
1567/// into `Value::List(Value::Record(...))` shape — column-name keys,
1568/// SQLite-typed values mapped one-for-one to Lex value variants
1569/// (Null → Unit, Integer → Int, Real → Float, Text → Str, Blob →
1570/// Bytes). Returns the standard `Result[List[T], Str]` Lex shape.
1571fn sql_run_query(
1572    conn: &rusqlite::Connection,
1573    stmt_str: &str,
1574    params: &[String],
1575) -> Value {
1576    let mut stmt = match conn.prepare(stmt_str) {
1577        Ok(s)  => s,
1578        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1579    };
1580    let column_count = stmt.column_count();
1581    let column_names: Vec<String> = (0..column_count)
1582        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1583        .collect();
1584    let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1585        .map(|s| s as &dyn rusqlite::ToSql)
1586        .collect();
1587    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1588        Ok(r)  => r,
1589        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1590    };
1591    let mut out: Vec<Value> = Vec::new();
1592    loop {
1593        let row = match rows.next() {
1594            Ok(Some(r)) => r,
1595            Ok(None)    => break,
1596            Err(e)      => return err(Value::Str(format!("sql.query: {e}"))),
1597        };
1598        let mut rec = indexmap::IndexMap::new();
1599        for (i, name) in column_names.iter().enumerate() {
1600            let cell = match row.get_ref(i) {
1601                Ok(c)  => sql_value_ref_to_lex(c),
1602                Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1603            };
1604            rec.insert(name.clone(), cell);
1605        }
1606        out.push(Value::Record(rec));
1607    }
1608    ok(Value::List(out))
1609}
1610
1611fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1612    use rusqlite::types::ValueRef;
1613    match v {
1614        ValueRef::Null       => Value::Unit,
1615        ValueRef::Integer(n) => Value::Int(n),
1616        ValueRef::Real(f)    => Value::Float(f),
1617        ValueRef::Text(s)    => Value::Str(String::from_utf8_lossy(s).into_owned()),
1618        ValueRef::Blob(b)    => Value::Bytes(b.to_vec()),
1619    }
1620}
1621
1622// -- log state (process-wide; configurable via log.set_*) --
1623
1624#[derive(Clone, Copy, PartialEq, PartialOrd)]
1625enum LogLevel { Debug, Info, Warn, Error }
1626
1627#[derive(Clone, Copy, PartialEq)]
1628enum LogFormat { Text, Json }
1629
1630#[derive(Clone)]
1631enum LogSink {
1632    Stderr,
1633    File(std::sync::Arc<Mutex<std::fs::File>>),
1634}
1635
1636struct LogState {
1637    level: LogLevel,
1638    format: LogFormat,
1639    sink: LogSink,
1640}
1641
1642fn log_state() -> &'static Mutex<LogState> {
1643    static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1644    STATE.get_or_init(|| Mutex::new(LogState {
1645        level: LogLevel::Info,
1646        format: LogFormat::Text,
1647        sink: LogSink::Stderr,
1648    }))
1649}
1650
1651fn parse_log_level(s: &str) -> Option<LogLevel> {
1652    match s {
1653        "debug" => Some(LogLevel::Debug),
1654        "info" => Some(LogLevel::Info),
1655        "warn" => Some(LogLevel::Warn),
1656        "error" => Some(LogLevel::Error),
1657        _ => None,
1658    }
1659}
1660
1661fn level_label(l: LogLevel) -> &'static str {
1662    match l {
1663        LogLevel::Debug => "debug",
1664        LogLevel::Info => "info",
1665        LogLevel::Warn => "warn",
1666        LogLevel::Error => "error",
1667    }
1668}
1669
1670fn emit_log(level: LogLevel, msg: &str) {
1671    let state = log_state().lock().unwrap();
1672    if level < state.level {
1673        return;
1674    }
1675    let ts = chrono::Utc::now().to_rfc3339();
1676    let line = match state.format {
1677        LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1678        LogFormat::Json => {
1679            // Hand-rolled JSON to avoid pulling serde_json into the
1680            // hot path; msg gets minimal escaping (the four common
1681            // cases that break a JSON line).
1682            let escaped = msg
1683                .replace('\\', "\\\\")
1684                .replace('"',  "\\\"")
1685                .replace('\n', "\\n")
1686                .replace('\r', "\\r");
1687            format!(
1688                "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1689                level_label(level),
1690            )
1691        }
1692    };
1693    let sink = state.sink.clone();
1694    drop(state);
1695    match sink {
1696        LogSink::Stderr => {
1697            use std::io::Write;
1698            let _ = std::io::stderr().write_all(line.as_bytes());
1699        }
1700        LogSink::File(f) => {
1701            use std::io::Write;
1702            if let Ok(mut g) = f.lock() {
1703                let _ = g.write_all(line.as_bytes());
1704            }
1705        }
1706    }
1707}
1708
1709pub(crate) struct ProcessState {
1710    child: std::process::Child,
1711    stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1712    stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1713}
1714
1715/// Process-wide registry of live `process.spawn` handles. Capped at
1716/// [`MAX_PROCESS_HANDLES`] to bound long-running programs that spawn
1717/// many short-lived children: on each `spawn` past the cap, the
1718/// least-recently-used entry is dropped (which `Drop`s its
1719/// `ProcessState`, leaving the child orphaned but the registry
1720/// bounded). `process.wait` also drops the entry on completion since
1721/// the handle becomes terminal once the child exits.
1722///
1723/// Each entry is wrapped in `Arc<Mutex<ProcessState>>` so the global
1724/// lookup mutex is held only briefly during dispatch — once we have
1725/// the per-handle `Arc`, the global lock is released and the slow
1726/// op (`wait`, `read_*_line`) only contends on its own handle's
1727/// mutex. Reads on different handles no longer block each other.
1728fn process_registry() -> &'static Mutex<ProcessRegistry> {
1729    static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1730    REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1731}
1732
1733const MAX_PROCESS_HANDLES: usize = 256;
1734
1735type SharedProcessState = Arc<Mutex<ProcessState>>;
1736
1737pub(crate) struct ProcessRegistry {
1738    entries: indexmap::IndexMap<u64, SharedProcessState>,
1739    cap: usize,
1740}
1741
1742impl ProcessRegistry {
1743    pub(crate) fn with_capacity(cap: usize) -> Self {
1744        Self { entries: indexmap::IndexMap::new(), cap }
1745    }
1746
1747    /// Insert a freshly-spawned child. If at cap, evict the LRU entry
1748    /// first; the dropped `ProcessState`'s child stays alive (orphaned)
1749    /// but its file descriptors are released.
1750    pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1751        if self.entries.len() >= self.cap {
1752            self.entries.shift_remove_index(0);
1753        }
1754        self.entries.insert(handle, Arc::new(Mutex::new(state)));
1755    }
1756
1757    /// Look up a handle, marking it most-recently-used on hit. Returns
1758    /// a clone of the shared `Arc` — callers should release the global
1759    /// registry lock before locking the per-handle mutex.
1760    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1761        let idx = self.entries.get_index_of(&handle)?;
1762        self.entries.move_index(idx, self.entries.len() - 1);
1763        self.entries.get(&handle).cloned()
1764    }
1765
1766    /// Drop the registry entry. The underlying `Arc` may outlive the
1767    /// removal if another op still holds it; that's intentional — the
1768    /// in-flight op finishes against the existing `ProcessState`, and
1769    /// only fresh lookups start failing.
1770    pub(crate) fn remove(&mut self, handle: u64) {
1771        self.entries.shift_remove(&handle);
1772    }
1773
1774    #[cfg(test)]
1775    pub(crate) fn len(&self) -> usize { self.entries.len() }
1776}
1777
1778fn next_process_handle() -> u64 {
1779    static COUNTER: AtomicU64 = AtomicU64::new(1);
1780    COUNTER.fetch_add(1, Ordering::SeqCst)
1781}
1782
1783#[cfg(all(test, unix))]
1784mod process_registry_tests {
1785    use super::{ProcessRegistry, ProcessState};
1786
1787    /// Spawn a trivial short-lived child for use as registry payload.
1788    /// `true` exits immediately — we don't actually run the child for
1789    /// real, we just need a valid `std::process::Child`.
1790    fn fresh_state() -> ProcessState {
1791        let child = std::process::Command::new("true")
1792            .stdout(std::process::Stdio::null())
1793            .stderr(std::process::Stdio::null())
1794            .spawn()
1795            .expect("spawn `true`");
1796        ProcessState { child, stdout: None, stderr: None }
1797    }
1798
1799    #[test]
1800    fn insert_and_get_round_trip() {
1801        let mut r = ProcessRegistry::with_capacity(4);
1802        r.insert(1, fresh_state());
1803        assert!(r.touch_get(1).is_some());
1804        assert!(r.touch_get(2).is_none());
1805    }
1806
1807    #[test]
1808    fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1809        let mut r = ProcessRegistry::with_capacity(4);
1810        r.insert(1, fresh_state());
1811        r.insert(2, fresh_state());
1812        let a = r.touch_get(1).unwrap();
1813        let b = r.touch_get(2).unwrap();
1814        // Different Arcs — pointer-equality check.
1815        assert!(!std::sync::Arc::ptr_eq(&a, &b));
1816    }
1817
1818    #[test]
1819    fn cap_evicts_lru_on_overflow() {
1820        let mut r = ProcessRegistry::with_capacity(2);
1821        r.insert(1, fresh_state());
1822        r.insert(2, fresh_state());
1823        let _ = r.touch_get(1);
1824        r.insert(3, fresh_state());
1825        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1826        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1827        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1828        assert_eq!(r.len(), 2);
1829    }
1830
1831    #[test]
1832    fn cap_with_no_touches_evicts_in_insertion_order() {
1833        let mut r = ProcessRegistry::with_capacity(2);
1834        r.insert(10, fresh_state());
1835        r.insert(20, fresh_state());
1836        r.insert(30, fresh_state());
1837        assert!(r.touch_get(10).is_none());
1838        assert!(r.touch_get(20).is_some());
1839        assert!(r.touch_get(30).is_some());
1840    }
1841
1842    #[test]
1843    fn remove_drops_entry() {
1844        let mut r = ProcessRegistry::with_capacity(4);
1845        r.insert(1, fresh_state());
1846        r.remove(1);
1847        assert!(r.touch_get(1).is_none());
1848        assert_eq!(r.len(), 0);
1849    }
1850
1851    #[test]
1852    fn many_inserts_stay_bounded_at_cap() {
1853        let cap = 8;
1854        let mut r = ProcessRegistry::with_capacity(cap);
1855        for i in 0..(cap as u64 * 3) {
1856            r.insert(i, fresh_state());
1857            assert!(r.len() <= cap);
1858        }
1859        assert_eq!(r.len(), cap);
1860    }
1861
1862    #[test]
1863    fn outstanding_arc_outlives_remove() {
1864        // Holding the per-handle Arc while another op removes the
1865        // entry must not invalidate the in-flight op. Mirrors the
1866        // wait-completes-then-removes pattern.
1867        let mut r = ProcessRegistry::with_capacity(4);
1868        r.insert(1, fresh_state());
1869        let arc = r.touch_get(1).expect("entry exists");
1870        r.remove(1);
1871        // Registry forgot about it, but the Arc still works.
1872        assert!(r.touch_get(1).is_none());
1873        let _state = arc.lock().unwrap();
1874    }
1875}
1876
1877fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
1878    match v {
1879        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1880        Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
1881        None => Err("missing ProcessHandle argument".into()),
1882    }
1883}
1884
1885/// Process-wide registry of open `Kv` handles. Each `kv.open` allocates
1886/// a new u64 handle via [`next_kv_handle`] and stores the `sled::Db`
1887/// here; subsequent ops fetch by handle. `kv.close` removes the entry.
1888///
1889/// Capped at [`MAX_KV_HANDLES`] to prevent leaks from long-running
1890/// programs that open many short-lived stores without calling
1891/// `kv.close`. On insert at cap, the least-recently-used entry is
1892/// dropped (closing its `sled::Db`); subsequent ops on the evicted
1893/// handle return the standard "closed or unknown Kv handle" error.
1894/// Any access (`get`, `put`, `delete`, `contains`, `list_prefix`)
1895/// touches the LRU order.
1896fn kv_registry() -> &'static Mutex<KvRegistry> {
1897    static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
1898    REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
1899}
1900
1901/// Maximum number of `kv.open` handles kept alive at once. Past this
1902/// cap, the least-recently-used handle is evicted on each new open.
1903/// Sized so that pathological "open and forget" programs are bounded
1904/// without breaking real-world programs that intentionally keep one or
1905/// two long-lived stores open.
1906const MAX_KV_HANDLES: usize = 256;
1907
1908/// LRU-bounded set of open `sled::Db` instances keyed by `u64` handle.
1909/// Built on `IndexMap` for O(1) insert / remove / lookup with
1910/// insertion-order traversal — touching an entry just shift-moves it
1911/// to the back, evictions pop from the front.
1912pub(crate) struct KvRegistry {
1913    entries: indexmap::IndexMap<u64, sled::Db>,
1914    cap: usize,
1915}
1916
1917impl KvRegistry {
1918    pub(crate) fn with_capacity(cap: usize) -> Self {
1919        Self { entries: indexmap::IndexMap::new(), cap }
1920    }
1921
1922    /// Insert a freshly-opened db. If we're already at cap, evict the
1923    /// LRU entry first; the dropped `sled::Db` closes its files.
1924    pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
1925        if self.entries.len() >= self.cap {
1926            self.entries.shift_remove_index(0);
1927        }
1928        self.entries.insert(handle, db);
1929    }
1930
1931    /// Look up a handle, marking it most-recently-used on hit.
1932    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
1933        let idx = self.entries.get_index_of(&handle)?;
1934        self.entries.move_index(idx, self.entries.len() - 1);
1935        self.entries.get(&handle)
1936    }
1937
1938    /// Explicit `kv.close`: drop the handle if present.
1939    pub(crate) fn remove(&mut self, handle: u64) {
1940        self.entries.shift_remove(&handle);
1941    }
1942
1943    #[cfg(test)]
1944    pub(crate) fn len(&self) -> usize { self.entries.len() }
1945}
1946
1947fn next_kv_handle() -> u64 {
1948    static COUNTER: AtomicU64 = AtomicU64::new(1);
1949    COUNTER.fetch_add(1, Ordering::SeqCst)
1950}
1951
1952/// Process-wide registry of open `Db` handles. Same shape as the kv
1953/// and process registries: per-handle `Arc<Mutex<…>>` so dispatch
1954/// only briefly holds the global lock and ops on different
1955/// connections don't serialize. LRU-bounded at
1956/// [`MAX_SQL_HANDLES`] to avoid leaks from long-running programs
1957/// that open many short-lived databases.
1958fn sql_registry() -> &'static Mutex<SqlRegistry> {
1959    static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
1960    REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
1961}
1962
1963const MAX_SQL_HANDLES: usize = 256;
1964
1965type SharedConn = Arc<Mutex<rusqlite::Connection>>;
1966
1967pub(crate) struct SqlRegistry {
1968    entries: indexmap::IndexMap<u64, SharedConn>,
1969    cap: usize,
1970}
1971
1972impl SqlRegistry {
1973    pub(crate) fn with_capacity(cap: usize) -> Self {
1974        Self { entries: indexmap::IndexMap::new(), cap }
1975    }
1976
1977    pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
1978        if self.entries.len() >= self.cap {
1979            self.entries.shift_remove_index(0);
1980        }
1981        self.entries.insert(handle, Arc::new(Mutex::new(conn)));
1982    }
1983
1984    /// Look up a handle, marking it MRU on hit. Returns a clone of
1985    /// the shared `Arc` so callers release the global registry
1986    /// lock before locking the per-handle mutex.
1987    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
1988        let idx = self.entries.get_index_of(&handle)?;
1989        self.entries.move_index(idx, self.entries.len() - 1);
1990        self.entries.get(&handle).cloned()
1991    }
1992
1993    pub(crate) fn remove(&mut self, handle: u64) {
1994        self.entries.shift_remove(&handle);
1995    }
1996
1997    #[cfg(test)]
1998    pub(crate) fn len(&self) -> usize { self.entries.len() }
1999}
2000
2001fn next_sql_handle() -> u64 {
2002    static COUNTER: AtomicU64 = AtomicU64::new(1);
2003    COUNTER.fetch_add(1, Ordering::SeqCst)
2004}
2005
2006#[cfg(test)]
2007mod sql_registry_tests {
2008    use super::SqlRegistry;
2009
2010    fn fresh() -> rusqlite::Connection {
2011        rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
2012    }
2013
2014    #[test]
2015    fn insert_and_get_round_trip() {
2016        let mut r = SqlRegistry::with_capacity(4);
2017        r.insert(1, fresh());
2018        assert!(r.touch_get(1).is_some());
2019        assert!(r.touch_get(2).is_none());
2020    }
2021
2022    #[test]
2023    fn cap_evicts_lru_on_overflow() {
2024        let mut r = SqlRegistry::with_capacity(2);
2025        r.insert(1, fresh());
2026        r.insert(2, fresh());
2027        let _ = r.touch_get(1);
2028        r.insert(3, fresh());
2029        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2030        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2031        assert!(r.touch_get(3).is_some(), "3 just inserted");
2032        assert_eq!(r.len(), 2);
2033    }
2034
2035    #[test]
2036    fn remove_drops_entry() {
2037        let mut r = SqlRegistry::with_capacity(4);
2038        r.insert(1, fresh());
2039        r.remove(1);
2040        assert!(r.touch_get(1).is_none());
2041        assert_eq!(r.len(), 0);
2042    }
2043
2044    #[test]
2045    fn many_inserts_stay_bounded_at_cap() {
2046        let cap = 8;
2047        let mut r = SqlRegistry::with_capacity(cap);
2048        for i in 0..(cap as u64 * 3) {
2049            r.insert(i, fresh());
2050            assert!(r.len() <= cap);
2051        }
2052        assert_eq!(r.len(), cap);
2053    }
2054}
2055
2056#[cfg(test)]
2057mod kv_registry_tests {
2058    use super::KvRegistry;
2059
2060    /// Spin up an isolated `sled::Db` in a temp dir. Each call gets a
2061    /// unique path so concurrent tests don't collide on the lockfile.
2062    fn fresh_db(tag: &str) -> sled::Db {
2063        let dir = std::env::temp_dir().join(format!(
2064            "lex-kv-reg-{}-{}-{}",
2065            std::process::id(),
2066            tag,
2067            std::time::SystemTime::now()
2068                .duration_since(std::time::UNIX_EPOCH)
2069                .unwrap()
2070                .as_nanos()
2071        ));
2072        sled::open(&dir).expect("sled open")
2073    }
2074
2075    #[test]
2076    fn insert_and_get_round_trip() {
2077        let mut r = KvRegistry::with_capacity(4);
2078        r.insert(1, fresh_db("a"));
2079        assert!(r.touch_get(1).is_some());
2080        assert!(r.touch_get(2).is_none());
2081    }
2082
2083    #[test]
2084    fn cap_evicts_lru_on_overflow() {
2085        // cap=2: insert 1, 2; touch 1 (now MRU); insert 3 → 2 evicted.
2086        let mut r = KvRegistry::with_capacity(2);
2087        r.insert(1, fresh_db("c1"));
2088        r.insert(2, fresh_db("c2"));
2089        let _ = r.touch_get(1);
2090        r.insert(3, fresh_db("c3"));
2091        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2092        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2093        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2094        assert_eq!(r.len(), 2);
2095    }
2096
2097    #[test]
2098    fn cap_with_no_touches_evicts_in_insertion_order() {
2099        // cap=2: insert 1, 2, 3 with no touches → 1 evicted (FIFO).
2100        let mut r = KvRegistry::with_capacity(2);
2101        r.insert(10, fresh_db("f1"));
2102        r.insert(20, fresh_db("f2"));
2103        r.insert(30, fresh_db("f3"));
2104        assert!(r.touch_get(10).is_none());
2105        assert!(r.touch_get(20).is_some());
2106        assert!(r.touch_get(30).is_some());
2107    }
2108
2109    #[test]
2110    fn remove_drops_entry() {
2111        let mut r = KvRegistry::with_capacity(4);
2112        r.insert(1, fresh_db("r1"));
2113        r.remove(1);
2114        assert!(r.touch_get(1).is_none());
2115        assert_eq!(r.len(), 0);
2116    }
2117
2118    #[test]
2119    fn remove_unknown_handle_is_noop() {
2120        let mut r = KvRegistry::with_capacity(4);
2121        r.insert(1, fresh_db("u1"));
2122        r.remove(999);
2123        assert!(r.touch_get(1).is_some());
2124    }
2125
2126    #[test]
2127    fn many_inserts_stay_bounded_at_cap() {
2128        // Exhaust the cap to confirm the registry never grows past it,
2129        // even under sustained churn.
2130        let cap = 8;
2131        let mut r = KvRegistry::with_capacity(cap);
2132        for i in 0..(cap as u64 * 3) {
2133            r.insert(i, fresh_db(&format!("b{i}")));
2134            assert!(r.len() <= cap);
2135        }
2136        assert_eq!(r.len(), cap);
2137    }
2138}