Skip to main content

lex_runtime/
handler.rs

1//! Native effect handlers, dispatched at runtime through the VM's
2//! `EffectHandler` trait. The handler also re-checks the runtime policy
3//! per spec §7.4 (the static check is necessary but not sufficient: a fn
4//! declared `[fs_read("/data")]` that's allowed at startup still has to
5//! pass the path check at the point of dispatch).
6
7use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18/// Output sink used by `io.print`. Tests inject a buffer; production prints
19/// to stdout.
20pub trait IoSink: Send {
21    fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26    fn print_line(&mut self, s: &str) {
27        println!("{s}");
28    }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34    fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37/// `agent.cloud_stream` registry: per-handle producer iterators
38/// keyed by opaque handle id (#305 slice 3).
39pub type StreamRegistry =
40    std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43    policy: Policy,
44    pub sink: Box<dyn IoSink>,
45    /// Optional read root for `io.read` — when set, `io.read("p")` resolves
46    /// to `read_root.join(p)`. Lets tests run without touching the real fs.
47    pub read_root: Option<PathBuf>,
48    /// Per-run budget pool (#225). `Arc<AtomicU64>` so parallel
49    /// branches share one counter without locking. Initialized to
50    /// the policy ceiling at handler construction; each call to a
51    /// function with declared `[budget(N)]` deducts N atomically
52    /// via `note_call_budget`. Cloning the handler is intentional
53    /// for net.serve / chat handlers — they share the same pool.
54    pub budget_remaining: Arc<AtomicU64>,
55    /// The original ceiling that `budget_remaining` started at, kept
56    /// for diagnostics so a `BudgetExceeded` error can report
57    /// `(used, ceiling)` rather than just "exceeded by N".
58    pub budget_ceiling: Option<u64>,
59    /// Shared reference to the program, needed by `net.serve` so the
60    /// handler can spin up fresh VMs to dispatch incoming requests.
61    /// `None` if the handler was constructed without a program.
62    pub program: Option<Arc<Program>>,
63    /// Chat registry; populated by `net.serve_ws`'s per-message
64    /// dispatch so `chat.broadcast` / `chat.send` work from inside
65    /// a handler invocation.
66    pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67    /// LRU cache of `agent.call_mcp` clients keyed by the
68    /// command-line string (#197). Avoids spawn-per-call cost
69    /// when an agent invokes the same MCP server in tight loops.
70    /// Capped — when the cache is full, the least-recently-used
71    /// entry is dropped (its subprocess is reaped on Drop).
72    pub mcp_clients: crate::mcp_client::McpClientCache,
73    /// Stream registry for `agent.cloud_stream` / `stream.next` /
74    /// `stream.collect` (#305 slice 3). Keyed by an opaque handle
75    /// id; values are the producer iterators. Wrapped in
76    /// `Arc<Mutex<…>>` so par_map workers can share the same
77    /// stream pool (when slice-2's per-worker handler split chains
78    /// the registry through).
79    pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80    /// Monotonic counter for handing out fresh stream handle ids.
81    pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85    pub fn new(policy: Policy) -> Self {
86        // If the caller supplied a ceiling, the pool starts at that
87        // ceiling and counts down. No ceiling = `u64::MAX` so calls
88        // never refuse on budget grounds (existing behavior).
89        let ceiling = policy.budget;
90        let initial = ceiling.unwrap_or(u64::MAX);
91        Self {
92            policy,
93            sink: Box::new(StdoutSink),
94            read_root: None,
95            budget_remaining: Arc::new(AtomicU64::new(initial)),
96            budget_ceiling: ceiling,
97            program: None,
98            chat_registry: None,
99            mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100            streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101            next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102        }
103    }
104
105    pub fn with_program(mut self, program: Arc<Program>) -> Self {
106        self.program = Some(program); self
107    }
108
109    pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110        self.chat_registry = Some(registry); self
111    }
112
113    pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114        self.sink = sink; self
115    }
116
117    pub fn with_read_root(mut self, root: PathBuf) -> Self {
118        self.read_root = Some(root); self
119    }
120
121    fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122        if self.policy.allow_effects.contains(kind) {
123            Ok(())
124        } else {
125            Err(format!("effect `{kind}` not in --allow-effects"))
126        }
127    }
128
129    fn resolve_read_path(&self, p: &str) -> PathBuf {
130        match &self.read_root {
131            Some(root) => root.join(p.trim_start_matches('/')),
132            None => PathBuf::from(p),
133        }
134    }
135
136    fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137        match op {
138            "debug" | "info" | "warn" | "error" => {
139                let msg = expect_str(args.first())?;
140                let level = match op {
141                    "debug" => LogLevel::Debug,
142                    "info" => LogLevel::Info,
143                    "warn" => LogLevel::Warn,
144                    _ => LogLevel::Error,
145                };
146                emit_log(level, msg);
147                Ok(Value::Unit)
148            }
149            "set_level" => {
150                let s = expect_str(args.first())?;
151                match parse_log_level(s) {
152                    Some(l) => {
153                        log_state().lock().unwrap().level = l;
154                        Ok(ok(Value::Unit))
155                    }
156                    None => Ok(err(Value::Str(format!(
157                        "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158                }
159            }
160            "set_format" => {
161                let s = expect_str(args.first())?;
162                let fmt = match s {
163                    "text" => LogFormat::Text,
164                    "json" => LogFormat::Json,
165                    other => return Ok(err(Value::Str(format!(
166                        "log.set_format: unknown format `{other}`; expected text|json")))),
167                };
168                log_state().lock().unwrap().format = fmt;
169                Ok(ok(Value::Unit))
170            }
171            "set_sink" => {
172                let path = expect_str(args.first())?;
173                if path == "-" {
174                    log_state().lock().unwrap().sink = LogSink::Stderr;
175                    return Ok(ok(Value::Unit));
176                }
177                if let Err(e) = self.ensure_fs_write_path(path) {
178                    return Ok(err(Value::Str(e)));
179                }
180                match std::fs::OpenOptions::new()
181                    .create(true).append(true).open(path)
182                {
183                    Ok(f) => {
184                        log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185                        Ok(ok(Value::Unit))
186                    }
187                    Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188                }
189            }
190            other => Err(format!("unsupported log.{other}")),
191        }
192    }
193
194    fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195        match op {
196            "spawn" => {
197                let cmd = expect_str(args.first())?.to_string();
198                let raw_args = match args.get(1) {
199                    Some(Value::List(items)) => items.clone(),
200                    _ => return Err("process.spawn: args must be List[Str]".into()),
201                };
202                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203                    Value::Str(s) => Ok(s.clone()),
204                    other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205                }).collect();
206                let str_args = str_args?;
207                let opts = match args.get(2) {
208                    Some(Value::Record(r)) => r.clone(),
209                    _ => return Err("process.spawn: missing or invalid opts record".into()),
210                };
211
212                // Allow-list check, mirroring the existing proc.spawn.
213                if !self.policy.allow_proc.is_empty() {
214                    let basename = std::path::Path::new(&cmd)
215                        .file_name()
216                        .and_then(|s| s.to_str())
217                        .unwrap_or(&cmd);
218                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
219                        return Ok(err(Value::Str(format!(
220                            "process.spawn: `{cmd}` not in --allow-proc {:?}",
221                            self.policy.allow_proc
222                        ))));
223                    }
224                }
225
226                let mut command = std::process::Command::new(&cmd);
227                command.args(&str_args);
228                command.stdin(std::process::Stdio::piped());
229                command.stdout(std::process::Stdio::piped());
230                command.stderr(std::process::Stdio::piped());
231
232                if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233                    if name == "Some" {
234                        if let Some(Value::Str(s)) = vargs.first() {
235                            command.current_dir(s);
236                        }
237                    }
238                }
239                if let Some(Value::Map(env)) = opts.get("env") {
240                    for (k, v) in env {
241                        if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242                            command.env(ks, vs);
243                        }
244                    }
245                }
246
247                let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248                    Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249                        match vargs.first() {
250                            Some(Value::Bytes(b)) => Some(b.clone()),
251                            _ => None,
252                        }
253                    }
254                    _ => None,
255                };
256
257                let mut child = match command.spawn() {
258                    Ok(c) => c,
259                    Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260                };
261
262                if let Some(payload) = stdin_payload {
263                    if let Some(mut stdin) = child.stdin.take() {
264                        use std::io::Write;
265                        let _ = stdin.write_all(&payload);
266                        // Drop closes stdin; the child sees EOF.
267                    }
268                }
269
270                let stdout = child.stdout.take().map(std::io::BufReader::new);
271                let stderr = child.stderr.take().map(std::io::BufReader::new);
272                let handle = next_process_handle();
273                process_registry().lock().unwrap().insert(handle, ProcessState {
274                    child,
275                    stdout,
276                    stderr,
277                });
278                Ok(ok(Value::Int(handle as i64)))
279            }
280            "read_stdout_line" => Self::read_line_op(args, true),
281            "read_stderr_line" => Self::read_line_op(args, false),
282            "wait" => {
283                let h = expect_process_handle(args.first())?;
284                // Look up the per-handle Arc, then release the global
285                // lock before the (slow) wait so unrelated handles
286                // can dispatch concurrently.
287                let arc = process_registry().lock().unwrap()
288                    .touch_get(h)
289                    .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290                let status = {
291                    let mut state = arc.lock().unwrap();
292                    state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293                };
294                // Wait completion makes the handle terminal; drop it
295                // from the registry so the cap doesn't fill up with
296                // exited children.
297                process_registry().lock().unwrap().remove(h);
298                let mut rec = indexmap::IndexMap::new();
299                rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300                #[cfg(unix)]
301                {
302                    use std::os::unix::process::ExitStatusExt;
303                    rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304                }
305                #[cfg(not(unix))]
306                {
307                    rec.insert("signaled".into(), Value::Bool(false));
308                }
309                Ok(Value::Record(rec))
310            }
311            "kill" => {
312                let h = expect_process_handle(args.first())?;
313                let _signal = expect_str(args.get(1))?;
314                let arc = process_registry().lock().unwrap()
315                    .touch_get(h)
316                    .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317                let mut state = arc.lock().unwrap();
318                // Cross-platform: only `kill` (SIGKILL-equivalent on
319                // Windows). Signal-name dispatch is a v1.5 follow-up.
320                match state.child.kill() {
321                    Ok(_) => Ok(ok(Value::Unit)),
322                    Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323                }
324            }
325            "run" => {
326                let cmd = expect_str(args.first())?.to_string();
327                let raw_args = match args.get(1) {
328                    Some(Value::List(items)) => items.clone(),
329                    _ => return Err("process.run: args must be List[Str]".into()),
330                };
331                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332                    Value::Str(s) => Ok(s.clone()),
333                    other => Err(format!("process.run: arg must be Str, got {other:?}")),
334                }).collect();
335                let str_args = str_args?;
336                if !self.policy.allow_proc.is_empty() {
337                    let basename = std::path::Path::new(&cmd)
338                        .file_name()
339                        .and_then(|s| s.to_str())
340                        .unwrap_or(&cmd);
341                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
342                        return Ok(err(Value::Str(format!(
343                            "process.run: `{cmd}` not in --allow-proc {:?}",
344                            self.policy.allow_proc
345                        ))));
346                    }
347                }
348                match std::process::Command::new(&cmd).args(&str_args).output() {
349                    Ok(o) => {
350                        let mut rec = indexmap::IndexMap::new();
351                        rec.insert("stdout".into(), Value::Str(
352                            String::from_utf8_lossy(&o.stdout).to_string()));
353                        rec.insert("stderr".into(), Value::Str(
354                            String::from_utf8_lossy(&o.stderr).to_string()));
355                        rec.insert("exit_code".into(), Value::Int(
356                            o.status.code().unwrap_or(-1) as i64));
357                        Ok(ok(Value::Record(rec)))
358                    }
359                    Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360                }
361            }
362            other => Err(format!("unsupported process.{other}")),
363        }
364    }
365
366    /// Read one line from the child's stdout (`is_stdout = true`) or
367    /// stderr. Returns `None` (Lex `Option`) at EOF; subsequent calls
368    /// keep returning `None`. Holds only the per-handle mutex during
369    /// the (potentially blocking) read, so reads on one handle don't
370    /// block reads/waits on a different handle.
371    fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372        let h = expect_process_handle(args.first())?;
373        let arc = process_registry().lock().unwrap()
374            .touch_get(h)
375            .ok_or_else(|| format!(
376                "process.read_{}_line: closed or unknown ProcessHandle",
377                if is_stdout { "stdout" } else { "stderr" }))?;
378        let mut state = arc.lock().unwrap();
379        let reader_opt = if is_stdout {
380            state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381        } else {
382            state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383        };
384        let reader = match reader_opt {
385            Some(r) => r,
386            None => return Ok(none()),
387        };
388        let mut line = String::new();
389        match reader.read_line(&mut line) {
390            Ok(0) => Ok(none()),
391            Ok(_) => {
392                if line.ends_with('\n') { line.pop(); }
393                if line.ends_with('\r') { line.pop(); }
394                Ok(some(Value::Str(line)))
395            }
396            Err(e) => Err(format!("process.read_*_line: {e}")),
397        }
398    }
399
400    fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401        match op {
402            "exists" => {
403                let path = expect_str(args.first())?.to_string();
404                if let Err(e) = self.ensure_fs_walk_path(&path) {
405                    return Ok(err(Value::Str(e)));
406                }
407                Ok(Value::Bool(std::path::Path::new(&path).exists()))
408            }
409            "is_file" => {
410                let path = expect_str(args.first())?.to_string();
411                if let Err(e) = self.ensure_fs_walk_path(&path) {
412                    return Ok(err(Value::Str(e)));
413                }
414                Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415            }
416            "is_dir" => {
417                let path = expect_str(args.first())?.to_string();
418                if let Err(e) = self.ensure_fs_walk_path(&path) {
419                    return Ok(err(Value::Str(e)));
420                }
421                Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422            }
423            "stat" => {
424                let path = expect_str(args.first())?.to_string();
425                if let Err(e) = self.ensure_fs_walk_path(&path) {
426                    return Ok(err(Value::Str(e)));
427                }
428                match std::fs::metadata(&path) {
429                    Ok(md) => {
430                        let mtime = md.modified()
431                            .ok()
432                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433                            .map(|d| d.as_secs() as i64)
434                            .unwrap_or(0);
435                        let mut rec = indexmap::IndexMap::new();
436                        rec.insert("size".into(), Value::Int(md.len() as i64));
437                        rec.insert("mtime".into(), Value::Int(mtime));
438                        rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439                        rec.insert("is_file".into(), Value::Bool(md.is_file()));
440                        Ok(ok(Value::Record(rec)))
441                    }
442                    Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443                }
444            }
445            "list_dir" => {
446                let path = expect_str(args.first())?.to_string();
447                if let Err(e) = self.ensure_fs_walk_path(&path) {
448                    return Ok(err(Value::Str(e)));
449                }
450                match std::fs::read_dir(&path) {
451                    Ok(rd) => {
452                        let mut entries: Vec<Value> = Vec::new();
453                        for ent in rd {
454                            match ent {
455                                Ok(e) => {
456                                    let p = e.path();
457                                    entries.push(Value::Str(p.to_string_lossy().into_owned()));
458                                }
459                                Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460                            }
461                        }
462                        Ok(ok(Value::List(entries)))
463                    }
464                    Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465                }
466            }
467            "walk" => {
468                let path = expect_str(args.first())?.to_string();
469                if let Err(e) = self.ensure_fs_walk_path(&path) {
470                    return Ok(err(Value::Str(e)));
471                }
472                let mut paths: Vec<Value> = Vec::new();
473                for ent in walkdir::WalkDir::new(&path) {
474                    match ent {
475                        Ok(e) => paths.push(Value::Str(
476                            e.path().to_string_lossy().into_owned())),
477                        Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478                    }
479                }
480                Ok(ok(Value::List(paths)))
481            }
482            "glob" => {
483                let pattern = expect_str(args.first())?.to_string();
484                // Glob patterns can't be path-scoped at parse time
485                // (`**/*.rs` doesn't pin a directory); we filter the
486                // per-result paths after expansion against
487                // `--allow-fs-read`.
488                let entries = match glob::glob(&pattern) {
489                    Ok(e) => e,
490                    Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491                };
492                let mut paths: Vec<Value> = Vec::new();
493                for ent in entries {
494                    match ent {
495                        Ok(p) => {
496                            let s = p.to_string_lossy().into_owned();
497                            if self.policy.allow_fs_read.is_empty()
498                                || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499                            {
500                                paths.push(Value::Str(s));
501                            }
502                        }
503                        Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504                    }
505                }
506                Ok(ok(Value::List(paths)))
507            }
508            "mkdir_p" => {
509                let path = expect_str(args.first())?.to_string();
510                if let Err(e) = self.ensure_fs_write_path(&path) {
511                    return Ok(err(Value::Str(e)));
512                }
513                match std::fs::create_dir_all(&path) {
514                    Ok(_) => Ok(ok(Value::Unit)),
515                    Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516                }
517            }
518            "remove" => {
519                let path = expect_str(args.first())?.to_string();
520                if let Err(e) = self.ensure_fs_write_path(&path) {
521                    return Ok(err(Value::Str(e)));
522                }
523                let p = std::path::Path::new(&path);
524                let result = if p.is_dir() {
525                    std::fs::remove_dir_all(p)
526                } else {
527                    std::fs::remove_file(p)
528                };
529                match result {
530                    Ok(_) => Ok(ok(Value::Unit)),
531                    Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532                }
533            }
534            "copy" => {
535                let src = expect_str(args.first())?.to_string();
536                let dst = expect_str(args.get(1))?.to_string();
537                if let Err(e) = self.ensure_fs_walk_path(&src) {
538                    return Ok(err(Value::Str(e)));
539                }
540                if let Err(e) = self.ensure_fs_write_path(&dst) {
541                    return Ok(err(Value::Str(e)));
542                }
543                match std::fs::copy(&src, &dst) {
544                    Ok(_) => Ok(ok(Value::Unit)),
545                    Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546                }
547            }
548            other => Err(format!("unsupported fs.{other}")),
549        }
550    }
551
552    /// Path scope for walk-style operations. `[fs_walk]` reuses the
553    /// `--allow-fs-read` allowlist — listing a directory is an
554    /// information disclosure on the same path tree as reading file
555    /// content, so the same scope applies. Empty allowlist = any path.
556    fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557        if self.policy.allow_fs_read.is_empty() {
558            return Ok(());
559        }
560        let p = std::path::Path::new(path);
561        if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562            Ok(())
563        } else {
564            Err(format!("fs path `{path}` outside --allow-fs-read"))
565        }
566    }
567
568    /// Path scope for mutating operations. `[fs_write]` uses the
569    /// existing `--allow-fs-write` allowlist.
570    fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571        if self.policy.allow_fs_write.is_empty() {
572            return Ok(());
573        }
574        let p = std::path::Path::new(path);
575        if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576            Ok(())
577        } else {
578            Err(format!("fs path `{path}` outside --allow-fs-write"))
579        }
580    }
581
582    /// Enforce `--allow-net-host` against an outgoing URL. Empty
583    /// allowlist = any host. Non-empty = the URL's host must match
584    /// (substring; port-agnostic) at least one entry.
585    fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586        if self.policy.allow_net_host.is_empty() { return Ok(()); }
587        let host = extract_host(url).unwrap_or("");
588        if self.policy.allow_net_host.iter().any(|h| host == h) {
589            Ok(())
590        } else {
591            Err(format!(
592                "net call to host `{host}` not in --allow-net-host {:?}",
593                self.policy.allow_net_host,
594            ))
595        }
596    }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600    let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601    let host_port = match rest.find('/') {
602        Some(i) => &rest[..i],
603        None => rest,
604    };
605    Some(match host_port.rsplit_once(':') {
606        Some((h, _)) => h,
607        None => host_port,
608    })
609}
610
611impl EffectHandler for DefaultHandler {
612    /// Per-call budget enforcement (#225). VM calls this before
613    /// invoking any function whose signature declares `[budget(N)]`.
614    /// The cost N is deducted atomically from the shared pool;
615    /// returning `Err` aborts the call before any frame is pushed.
616    fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617        // Skip the work entirely when no ceiling is configured —
618        // the pool is `u64::MAX` and would never trip.
619        let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620        // Compare-and-swap: speculatively subtract; if we'd
621        // underflow, return BudgetExceeded without mutating.
622        // Use SeqCst because parallel branches may race here and
623        // the relative ordering of "used so far" vs. "this call's
624        // cost" needs to be deterministic across threads.
625        loop {
626            let cur = self.budget_remaining.load(Ordering::SeqCst);
627            if cost > cur {
628                let used = ceiling.saturating_sub(cur);
629                return Err(format!(
630                    "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631            }
632            let next = cur - cost;
633            // Conservative accounting: if the CAS races and loses,
634            // re-read and try again. No refund-on-failure path.
635            if self.budget_remaining.compare_exchange(cur, next,
636                Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637                return Ok(());
638            }
639        }
640    }
641
642    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643        // Pure stdlib builtins (str, list, json, ...) bypass the policy
644        // gate — they have no observable side effects and aren't tracked
645        // by the type system as effects.
646        if let Some(r) = try_pure_builtin(kind, op, &args) {
647            return r;
648        }
649        // `std.fs` ops use the fine-grained `[fs_walk]` and `[fs_write]`
650        // effect kinds (distinct from the module name `fs`); the
651        // policy check uses the per-op kind, not the module's.
652        if kind == "process" {
653            self.ensure_kind_allowed("proc")?;
654            return self.dispatch_process(op, args);
655        }
656        if kind == "log" {
657            // Emit ops are [log]; config ops are [io] (set_sink also
658            // [fs_write]). The dispatch picks the right kind per op.
659            let effect_kind = match op {
660                "debug" | "info" | "warn" | "error" => "log",
661                "set_level" | "set_format" => "io",
662                "set_sink" => {
663                    self.ensure_kind_allowed("io")?;
664                    self.ensure_kind_allowed("fs_write")?;
665                    return self.dispatch_log(op, args);
666                }
667                other => return Err(format!("unsupported log.{other}")),
668            };
669            self.ensure_kind_allowed(effect_kind)?;
670            return self.dispatch_log(op, args);
671        }
672        if kind == "fs" {
673            let effect_kind = match op {
674                "exists" | "is_file" | "is_dir" | "stat"
675                | "list_dir" | "walk" | "glob" => "fs_walk",
676                "mkdir_p" | "remove" => "fs_write",
677                "copy" => {
678                    self.ensure_kind_allowed("fs_walk")?;
679                    self.ensure_kind_allowed("fs_write")?;
680                    return self.dispatch_fs(op, args);
681                }
682                other => return Err(format!("unsupported fs.{other}")),
683            };
684            self.ensure_kind_allowed(effect_kind)?;
685            return self.dispatch_fs(op, args);
686        }
687        // `crypto.random` is the lone effectful op in `std.crypto`. Its
688        // declared effect kind is `random` (fine-grained on purpose so
689        // `lex audit --effect random` flags every token-generating
690        // call), distinct from the `crypto` module name.
691        // datetime.now is the only effectful op in std.datetime;
692        // declared kind is `time`, matching the existing `time.now`.
693        if kind == "datetime" && op == "now" {
694            self.ensure_kind_allowed("time")?;
695            let now = chrono::Utc::now();
696            let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
697            return Ok(Value::Int(nanos));
698        }
699        if kind == "crypto" && op == "random" {
700            self.ensure_kind_allowed("random")?;
701            let n = expect_int(args.first())?;
702            if !(0..=1_048_576).contains(&n) {
703                return Err("crypto.random: n must be in 0..=1048576".into());
704            }
705            use rand::{rngs::SysRng, TryRng};
706            let mut buf = vec![0u8; n as usize];
707            SysRng.try_fill_bytes(&mut buf)
708                .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
709            return Ok(Value::Bytes(buf));
710        }
711        // `std.http` wire ops (send/get/post) gate on the `net`
712        // effect kind, not the module name. This matches the
713        // declared signature (`http.get :: Str -> [net] ...`) and
714        // keeps `--allow-effects net` doing the obvious thing for
715        // both `net.*` and `http.*` callers.
716        // `std.agent` (#184): the four runtime effects added for
717        // agent-style programs (`llm_local`, `llm_cloud`, `a2a`,
718        // `mcp`). The handlers are stubs — they enforce the
719        // declared-effect gate, return a sentinel `Ok` so traces
720        // record the call, and defer the real wire formats to
721        // downstream crates (`soft-agent` for `llm_*` and `a2a`)
722        // and #185 (MCP client wrapper).
723        if kind == "agent" {
724            let effect_kind = match op {
725                "local_complete" => "llm_local",
726                "cloud_complete" => "llm_cloud",
727                "cloud_stream"   => "llm_cloud",
728                "send_a2a"       => "a2a",
729                "call_mcp"       => "mcp",
730                other => return Err(format!("unsupported agent.{other}")),
731            };
732            self.ensure_kind_allowed(effect_kind)?;
733            // `call_mcp` runs through the LRU client cache
734            // (#197). `local_complete` / `cloud_complete` hit
735            // Ollama / OpenAI via env-var-driven configuration
736            // (#196); custom backends override at the
737            // EffectHandler layer rather than via a config file.
738            // `send_a2a` keeps its stub — that wire format
739            // lives in downstream `soft-a2a`.
740            return match op {
741                "call_mcp"       => Ok(self.dispatch_call_mcp(args)),
742                "local_complete" => Ok(dispatch_llm_local(args)),
743                "cloud_complete" => Ok(dispatch_llm_cloud(args)),
744                "cloud_stream"   => Ok(self.dispatch_cloud_stream(args)),
745                _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
746            };
747        }
748        if kind == "stream" {
749            // #305 slice 3: consumer-side stream operations. Each
750            // op resolves the opaque handle in the parent handler's
751            // stream registry and pulls one or all items. The
752            // `stream` effect must be granted by policy; default
753            // policies for agent runs grant it alongside the
754            // producer effect (e.g. `llm_cloud`).
755            self.ensure_kind_allowed("stream")?;
756            return match op {
757                "next"    => Ok(self.dispatch_stream_next(args)),
758                "collect" => Ok(self.dispatch_stream_collect(args)),
759                other => Err(format!("unsupported stream.{other}")),
760            };
761        }
762        if kind == "http" && matches!(op, "send" | "get" | "post") {
763            self.ensure_kind_allowed("net")?;
764            return match op {
765                "send" => {
766                    let req = expect_record(args.first())?;
767                    Ok(http_send_record(self, req))
768                }
769                "get" => {
770                    let url = expect_str(args.first())?.to_string();
771                    self.ensure_host_allowed(&url)?;
772                    Ok(http_send_simple("GET", &url, None, "", None))
773                }
774                "post" => {
775                    let url = expect_str(args.first())?.to_string();
776                    let body = expect_bytes(args.get(1))?.clone();
777                    let content_type = expect_str(args.get(2))?.to_string();
778                    self.ensure_host_allowed(&url)?;
779                    Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
780                }
781                _ => unreachable!(),
782            };
783        }
784        self.ensure_kind_allowed(kind)?;
785        match (kind, op) {
786            ("io", "print") => {
787                let line = expect_str(args.first())?;
788                self.sink.print_line(line);
789                Ok(Value::Unit)
790            }
791            ("io", "read") => {
792                let path = expect_str(args.first())?.to_string();
793                let resolved = self.resolve_read_path(&path);
794                // Honor read-allowlist if any. Symmetric with io.write.
795                // The path argument is checked as-given (resolved-against-
796                // read_root for tests); a tool granted [io] cannot escape
797                // the configured prefix even though the effect itself is
798                // permitted. This is the per-path scope the bench's case
799                // #6 ("[io] granted, body reads /etc/passwd") needed.
800                if !self.policy.allow_fs_read.is_empty()
801                    && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
802                {
803                    return Err(format!("read of `{path}` outside --allow-fs-read"));
804                }
805                match std::fs::read_to_string(&resolved) {
806                    Ok(s) => Ok(ok(Value::Str(s))),
807                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
808                }
809            }
810            ("io", "write") => {
811                let path = expect_str(args.first())?.to_string();
812                let contents = expect_str(args.get(1))?.to_string();
813                // Honor write-allowlist if any.
814                if !self.policy.allow_fs_write.is_empty() {
815                    let p = std::path::Path::new(&path);
816                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
817                        return Err(format!("write to `{path}` outside --allow-fs-write"));
818                    }
819                }
820                match std::fs::write(&path, contents) {
821                    Ok(_) => Ok(ok(Value::Unit)),
822                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
823                }
824            }
825            ("time", "now") => {
826                let secs = SystemTime::now().duration_since(UNIX_EPOCH)
827                    .map_err(|e| format!("time: {e}"))?.as_secs();
828                Ok(Value::Int(secs as i64))
829            }
830            ("time", "sleep_ms") => {
831                // Block the current thread for `n` ms (#226). Used
832                // by `flow.retry_with_backoff`'s exponential delay.
833                // Negative or zero is a no-op. Bounded at 60s in the
834                // runtime to avoid pathological agent-emitted loops
835                // wedging the host — anything legitimate beyond
836                // that should use process-level scheduling, not a
837                // blocking sleep.
838                let n = expect_int(args.first())?;
839                if n > 0 {
840                    let ms = (n as u64).min(60_000);
841                    std::thread::sleep(std::time::Duration::from_millis(ms));
842                }
843                Ok(Value::Unit)
844            }
845            ("rand", "int_in") => {
846                // Deterministic stub: midpoint of [lo, hi].
847                let lo = expect_int(args.first())?;
848                let hi = expect_int(args.get(1))?;
849                Ok(Value::Int((lo + hi) / 2))
850            }
851            // `env.get` returns `Option[Str]` — `None` for unset vars.
852            // Per-var scoping (`[env(NAME)]`) arrives with #207's
853            // per-capability effect parameterization; today the flat
854            // `[env]` grants access to the entire process environment.
855            ("env", "get") => {
856                let name = expect_str(args.first())?;
857                Ok(match std::env::var(name) {
858                    Ok(v) => Value::Variant {
859                        name: "Some".into(),
860                        args: vec![Value::Str(v)],
861                    },
862                    Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
863                })
864            }
865            ("budget", _) => {
866                // Budget calls are nominally tracked here; budget itself is
867                // enforced statically in `policy::check_program`.
868                Ok(Value::Unit)
869            }
870            ("net", "get") => {
871                let url = expect_str(args.first())?.to_string();
872                self.ensure_host_allowed(&url)?;
873                Ok(http_request("GET", &url, None))
874            }
875            ("net", "post") => {
876                let url = expect_str(args.first())?.to_string();
877                let body = expect_str(args.get(1))?.to_string();
878                self.ensure_host_allowed(&url)?;
879                Ok(http_request("POST", &url, Some(&body)))
880            }
881            ("net", "serve") => {
882                let port = match args.first() {
883                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
884                    _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
885                };
886                let handler_name = expect_str(args.get(1))?.to_string();
887                let program = self.program.clone()
888                    .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
889                let policy = self.policy.clone();
890                serve_http(port, handler_name, program, policy, None)
891            }
892            ("net", "serve_tls") => {
893                let port = match args.first() {
894                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
895                    _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
896                };
897                let cert_path = expect_str(args.get(1))?.to_string();
898                let key_path = expect_str(args.get(2))?.to_string();
899                let handler_name = expect_str(args.get(3))?.to_string();
900                let program = self.program.clone()
901                    .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
902                let policy = self.policy.clone();
903                let cert = std::fs::read(&cert_path)
904                    .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
905                let key = std::fs::read(&key_path)
906                    .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
907                serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
908            }
909            ("net", "serve_ws") => {
910                let port = match args.first() {
911                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
912                    _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
913                };
914                let handler_name = expect_str(args.get(1))?.to_string();
915                let program = self.program.clone()
916                    .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
917                let policy = self.policy.clone();
918                let registry = Arc::new(crate::ws::ChatRegistry::default());
919                crate::ws::serve_ws(port, handler_name, program, policy, registry)
920            }
921            ("chat", "broadcast") => {
922                let registry = self.chat_registry.as_ref()
923                    .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
924                let room = expect_str(args.first())?;
925                let body = expect_str(args.get(1))?;
926                crate::ws::chat_broadcast(registry, room, body);
927                Ok(Value::Unit)
928            }
929            ("chat", "send") => {
930                let registry = self.chat_registry.as_ref()
931                    .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
932                let conn_id = match args.first() {
933                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
934                    _ => return Err("chat.send: conn_id must be non-negative Int".into()),
935                };
936                let body = expect_str(args.get(1))?;
937                Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
938            }
939            ("kv", "open") => {
940                let path = expect_str(args.first())?.to_string();
941                // Honor write-allowlist: opening a Kv writes its
942                // backing files at `path`, so the same scoping that
943                // applies to `io.write` applies here.
944                if !self.policy.allow_fs_write.is_empty() {
945                    let p = std::path::Path::new(&path);
946                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
947                        return Ok(err(Value::Str(format!(
948                            "kv.open: `{path}` outside --allow-fs-write"))));
949                    }
950                }
951                match sled::open(&path) {
952                    Ok(db) => {
953                        let handle = next_kv_handle();
954                        kv_registry().lock().unwrap().insert(handle, db);
955                        Ok(ok(Value::Int(handle as i64)))
956                    }
957                    Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
958                }
959            }
960            ("kv", "close") => {
961                let h = expect_kv_handle(args.first())?;
962                kv_registry().lock().unwrap().remove(h);
963                Ok(Value::Unit)
964            }
965            ("kv", "get") => {
966                let h = expect_kv_handle(args.first())?;
967                let key = expect_str(args.get(1))?;
968                let mut reg = kv_registry().lock().unwrap();
969                let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
970                match db.get(key.as_bytes()) {
971                    Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
972                    Ok(None) => Ok(none()),
973                    Err(e) => Err(format!("kv.get: {e}")),
974                }
975            }
976            ("kv", "put") => {
977                let h = expect_kv_handle(args.first())?;
978                let key = expect_str(args.get(1))?.to_string();
979                let val = expect_bytes(args.get(2))?.clone();
980                let mut reg = kv_registry().lock().unwrap();
981                let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
982                match db.insert(key.as_bytes(), val) {
983                    Ok(_) => Ok(ok(Value::Unit)),
984                    Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
985                }
986            }
987            ("kv", "delete") => {
988                let h = expect_kv_handle(args.first())?;
989                let key = expect_str(args.get(1))?;
990                let mut reg = kv_registry().lock().unwrap();
991                let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
992                match db.remove(key.as_bytes()) {
993                    Ok(_) => Ok(ok(Value::Unit)),
994                    Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
995                }
996            }
997            ("kv", "contains") => {
998                let h = expect_kv_handle(args.first())?;
999                let key = expect_str(args.get(1))?;
1000                let mut reg = kv_registry().lock().unwrap();
1001                let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1002                match db.contains_key(key.as_bytes()) {
1003                    Ok(present) => Ok(Value::Bool(present)),
1004                    Err(e) => Err(format!("kv.contains: {e}")),
1005                }
1006            }
1007            ("kv", "list_prefix") => {
1008                let h = expect_kv_handle(args.first())?;
1009                let prefix = expect_str(args.get(1))?;
1010                let mut reg = kv_registry().lock().unwrap();
1011                let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1012                let mut keys: Vec<Value> = Vec::new();
1013                for kv in db.scan_prefix(prefix.as_bytes()) {
1014                    let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1015                    let s = String::from_utf8_lossy(&k).to_string();
1016                    keys.push(Value::Str(s));
1017                }
1018                Ok(Value::List(keys))
1019            }
1020            ("sql", "open") => {
1021                let path = expect_str(args.first())?.to_string();
1022                // Same shape as `kv.open`: opening creates the
1023                // SQLite file, so the fs-write allowlist applies
1024                // (in-memory paths are exempt).
1025                if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1026                    let p = std::path::Path::new(&path);
1027                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1028                        return Ok(err(Value::Str(format!(
1029                            "sql.open: `{path}` outside --allow-fs-write"))));
1030                    }
1031                }
1032                match rusqlite::Connection::open(&path) {
1033                    Ok(conn) => {
1034                        let handle = next_sql_handle();
1035                        sql_registry().lock().unwrap().insert(handle, conn);
1036                        Ok(ok(Value::Int(handle as i64)))
1037                    }
1038                    Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1039                }
1040            }
1041            ("sql", "close") => {
1042                let h = expect_sql_handle(args.first())?;
1043                sql_registry().lock().unwrap().remove(h);
1044                Ok(Value::Unit)
1045            }
1046            ("sql", "exec") => {
1047                let h = expect_sql_handle(args.first())?;
1048                let stmt = expect_str(args.get(1))?.to_string();
1049                let params = expect_str_list(args.get(2))?;
1050                let arc = sql_registry().lock().unwrap()
1051                    .touch_get(h)
1052                    .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1053                let conn = arc.lock().unwrap();
1054                let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1055                    .map(|s| s as &dyn rusqlite::ToSql)
1056                    .collect();
1057                match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1058                    Ok(n)  => Ok(ok(Value::Int(n as i64))),
1059                    Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1060                }
1061            }
1062            ("sql", "query") => {
1063                let h = expect_sql_handle(args.first())?;
1064                let stmt_str = expect_str(args.get(1))?.to_string();
1065                let params = expect_str_list(args.get(2))?;
1066                let arc = sql_registry().lock().unwrap()
1067                    .touch_get(h)
1068                    .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1069                let conn = arc.lock().unwrap();
1070                Ok(sql_run_query(&conn, &stmt_str, &params))
1071            }
1072            ("proc", "spawn") => {
1073                // The escape hatch effect. Spawns a child process,
1074                // collects its stdout/stderr, returns a structured
1075                // record. Allow-list is the binary basename: anything
1076                // outside `--allow-proc` is rejected pre-spawn.
1077                //
1078                // What this does NOT validate (per SECURITY.md):
1079                // - per-arg content (a script-like CLI invoked via
1080                //   --eval=... can run anything)
1081                // - environment variables (inherited from the parent)
1082                // - working directory (the parent's)
1083                //
1084                // For untrusted input, layer with OS-level
1085                // sandboxing — gVisor / nsjail / a container.
1086                let cmd = expect_str(args.first())?.to_string();
1087                let raw_args = match args.get(1) {
1088                    Some(Value::List(items)) => items,
1089                    Some(other) => return Err(format!(
1090                        "proc.spawn: args must be List[Str], got {other:?}")),
1091                    None => return Err("proc.spawn: missing args list".into()),
1092                };
1093                let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1094                    Value::Str(s) => Ok(s.clone()),
1095                    other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1096                }).collect::<Result<Vec<_>, _>>()?;
1097
1098                // Allow-list check: empty list = any binary (escape
1099                // hatch); non-empty = basename of cmd must match an
1100                // entry exactly.
1101                if !self.policy.allow_proc.is_empty() {
1102                    let basename = std::path::Path::new(&cmd)
1103                        .file_name()
1104                        .and_then(|s| s.to_str())
1105                        .unwrap_or(&cmd);
1106                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
1107                        return Ok(err(Value::Str(format!(
1108                            "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1109                            self.policy.allow_proc
1110                        ))));
1111                    }
1112                }
1113
1114                // Hard caps: the spec doesn't pin numbers, but
1115                // unbounded argv is a DoS vector.
1116                if str_args.len() > 1024 {
1117                    return Ok(err(Value::Str(
1118                        "proc.spawn: arg-count exceeds 1024".into())));
1119                }
1120                if str_args.iter().any(|a| a.len() > 65_536) {
1121                    return Ok(err(Value::Str(
1122                        "proc.spawn: per-arg length exceeds 64 KiB".into())));
1123                }
1124
1125                let output = std::process::Command::new(&cmd)
1126                    .args(&str_args)
1127                    .output();
1128                match output {
1129                    Ok(o) => {
1130                        let mut rec = indexmap::IndexMap::new();
1131                        rec.insert("stdout".into(), Value::Str(
1132                            String::from_utf8_lossy(&o.stdout).to_string()));
1133                        rec.insert("stderr".into(), Value::Str(
1134                            String::from_utf8_lossy(&o.stderr).to_string()));
1135                        rec.insert("exit_code".into(), Value::Int(
1136                            o.status.code().unwrap_or(-1) as i64));
1137                        Ok(ok(Value::Record(rec)))
1138                    }
1139                    Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1140                }
1141            }
1142            other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1143        }
1144    }
1145
1146    /// `list.par_map` worker-handler factory (#305 slice 2).
1147    ///
1148    /// Builds a fresh `DefaultHandler` per worker that shares the
1149    /// budget pool with the parent (`Arc<AtomicU64>`) so a parallel
1150    /// batch can't escape the run-wide budget ceiling. Other state
1151    /// is intentionally split per-worker:
1152    ///
1153    /// - `sink`: a `StdoutSink` per worker. Tests that capture
1154    ///   output via a `SharedSink` wrapped in `Arc<Mutex<…>>` see
1155    ///   each worker as a fresh handler. Print interleaving on
1156    ///   stdout is acceptable; tests that need ordered capture run
1157    ///   workloads serially anyway.
1158    /// - `mcp_clients`: a fresh per-worker LRU cache. The parent's
1159    ///   subprocess handles can't be shared across threads without
1160    ///   mutex-serialising every MCP call, which would defeat the
1161    ///   parallelism. Cache hit rate is sub-optimal across the
1162    ///   first call per worker; warmed caches still amortise within
1163    ///   a worker.
1164    /// - `chat_registry`: cloned `Arc<ChatRegistry>` so all workers
1165    ///   route into the same chat dispatch layer.
1166    /// - `program`: cloned `Arc<Program>` so `net.serve` (if a
1167    ///   worker invokes it) sees the same compiled program.
1168    fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1169        let mut fresh = DefaultHandler::new(self.policy.clone());
1170        // Share the budget pool atomically — slice 2's correctness
1171        // contract: parallel work counts against the same ceiling.
1172        fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1173        fresh.budget_ceiling = self.budget_ceiling;
1174        fresh.read_root = self.read_root.clone();
1175        fresh.program = self.program.clone();
1176        fresh.chat_registry = self.chat_registry.clone();
1177        // #305 slice 3: share the stream registry across workers so
1178        // a stream produced on one thread (or the parent) is
1179        // consumable on any other. The registry is already
1180        // `Arc<Mutex<…>>` so concurrent access is safe.
1181        fresh.streams = std::sync::Arc::clone(&self.streams);
1182        fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1183        Some(Box::new(fresh))
1184    }
1185}
1186
1187/// Blocks the calling thread, accepts incoming HTTP requests on
1188/// `127.0.0.1:port`, and dispatches each through the named Lex
1189/// stage. Each request gets a fresh `Vm`; the program and policy
1190/// are shared.
1191///
1192/// Handler signature in Lex (by convention):
1193///   fn <name>(req :: Record { method :: Str, path :: Str, body :: Str })
1194///        -> Record { status :: Int, body :: Str }
1195/// PEM-encoded certificate + private key, both as raw bytes.
1196pub struct TlsConfig {
1197    pub cert: Vec<u8>,
1198    pub key: Vec<u8>,
1199}
1200
1201fn serve_http(
1202    port: u16,
1203    handler_name: String,
1204    program: Arc<Program>,
1205    policy: Policy,
1206    tls: Option<TlsConfig>,
1207) -> Result<Value, String> {
1208    let (server, scheme) = match tls {
1209        None => (
1210            tiny_http::Server::http(("127.0.0.1", port))
1211                .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1212            "http",
1213        ),
1214        Some(cfg) => {
1215            let ssl = tiny_http::SslConfig {
1216                certificate: cfg.cert,
1217                private_key: cfg.key,
1218            };
1219            (
1220                tiny_http::Server::https(("127.0.0.1", port), ssl)
1221                    .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1222                "https",
1223            )
1224        }
1225    };
1226    eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1227    // Thread-per-request: the main loop accepts; each request runs on
1228    // its own worker thread with its own fresh Vm. The Program is
1229    // shared via Arc; Policy and handler_name are cloned per request.
1230    // Lex's immutability means there's no shared mutable state at the
1231    // language level — workers don't race.
1232    for req in server.incoming_requests() {
1233        let program = Arc::clone(&program);
1234        let policy = policy.clone();
1235        let handler_name = handler_name.clone();
1236        std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1237    }
1238    Ok(Value::Unit)
1239}
1240
1241fn handle_request(
1242    mut req: tiny_http::Request,
1243    program: Arc<Program>,
1244    policy: Policy,
1245    handler_name: String,
1246) {
1247    let lex_req = build_request_value(&mut req);
1248    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1249    let mut vm = Vm::with_handler(&program, Box::new(handler));
1250    match vm.call(&handler_name, vec![lex_req]) {
1251        Ok(resp) => {
1252            let (status, body) = unpack_response(&resp);
1253            let response = tiny_http::Response::from_string(body).with_status_code(status);
1254            let _ = req.respond(response);
1255        }
1256        Err(e) => {
1257            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1258                .with_status_code(500);
1259            let _ = req.respond(response);
1260        }
1261    }
1262}
1263
1264fn build_request_value(req: &mut tiny_http::Request) -> Value {
1265    let method = format!("{:?}", req.method()).to_uppercase();
1266    let url = req.url().to_string();
1267    let (path, query) = match url.split_once('?') {
1268        Some((p, q)) => (p.to_string(), q.to_string()),
1269        None => (url, String::new()),
1270    };
1271    let mut body = String::new();
1272    let _ = req.as_reader().read_to_string(&mut body);
1273    let mut rec = indexmap::IndexMap::new();
1274    rec.insert("method".into(), Value::Str(method));
1275    rec.insert("path".into(), Value::Str(path));
1276    rec.insert("query".into(), Value::Str(query));
1277    rec.insert("body".into(), Value::Str(body));
1278    Value::Record(rec)
1279}
1280
1281fn unpack_response(v: &Value) -> (u16, String) {
1282    if let Value::Record(rec) = v {
1283        let status = rec.get("status").and_then(|s| match s {
1284            Value::Int(n) => Some(*n as u16),
1285            _ => None,
1286        }).unwrap_or(200);
1287        let body = rec.get("body").and_then(|b| match b {
1288            Value::Str(s) => Some(s.clone()),
1289            _ => None,
1290        }).unwrap_or_default();
1291        return (status, body);
1292    }
1293    (500, format!("handler returned non-record: {v:?}"))
1294}
1295
1296/// HTTP/1.1 client backed by `ureq` + `rustls`. Accepts both
1297/// `http://` and `https://` URLs. Returns `Result[Str, Str]` as a
1298/// Lex `Value::Variant`. The earlier hand-rolled HTTP/1.0 client
1299/// was plain-TCP only — most public APIs are HTTPS, so the demo
1300/// could fetch `example.com` but not `wttr.in` or `api.github.com`.
1301fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1302    use std::time::Duration;
1303    // ureq 3 puts 4xx/5xx behind `Error::StatusCode(code)` and consumes
1304    // the response, so the body would be lost. Disabling
1305    // `http_status_as_error` lets us check the status manually and
1306    // surface `Err("status 404: <body>")` like the old code did.
1307    let agent: ureq::Agent = ureq::Agent::config_builder()
1308        .timeout_connect(Some(Duration::from_secs(10)))
1309        .timeout_recv_body(Some(Duration::from_secs(30)))
1310        .timeout_send_body(Some(Duration::from_secs(10)))
1311        .http_status_as_error(false)
1312        .build()
1313        .into();
1314    let resp = match (method, body) {
1315        ("GET", _) => agent.get(url).call(),
1316        ("POST", Some(b)) => agent.post(url).send(b),
1317        ("POST", None) => agent.post(url).send(""),
1318        (m, _) => return err_value(format!("unsupported method: {m}")),
1319    };
1320    match resp {
1321        Ok(mut r) => {
1322            let status = r.status().as_u16();
1323            let body = r.body_mut().read_to_string().unwrap_or_default();
1324            if (200..300).contains(&status) {
1325                Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1326            } else {
1327                err_value(format!("status {status}: {body}"))
1328            }
1329        }
1330        Err(e) => err_value(format!("transport: {e}")),
1331    }
1332}
1333
1334/// Build a ureq agent for `std.http.{send,get,post}` with the given
1335/// timeout (None → use the same defaults as the legacy `net.{get,post}`
1336/// path). Separate from `http_request` so the rich `http.send` flow
1337/// can supply per-request overrides.
1338fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1339    use std::time::Duration;
1340    let mut b = ureq::Agent::config_builder()
1341        .timeout_connect(Some(Duration::from_secs(10)))
1342        .timeout_recv_body(Some(Duration::from_secs(30)))
1343        .timeout_send_body(Some(Duration::from_secs(10)))
1344        .http_status_as_error(false);
1345    if let Some(ms) = timeout_ms {
1346        let d = Duration::from_millis(ms);
1347        b = b.timeout_global(Some(d));
1348    }
1349    b.build().into()
1350}
1351
1352/// Map ureq's transport error to the structured `HttpError` variant
1353/// std.http exposes to user code. Anything not specifically a
1354/// timeout / TLS error funnels into `NetworkError`.
1355fn http_error_value(e: ureq::Error) -> Value {
1356    let (ctor, payload): (&str, Option<String>) = match &e {
1357        ureq::Error::Timeout(_) => ("TimeoutError", None),
1358        ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1359        ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1360        ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1361        _ => ("NetworkError", Some(format!("{e}"))),
1362    };
1363    let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1364    let inner = Value::Variant { name: ctor.into(), args };
1365    Value::Variant { name: "Err".into(), args: vec![inner] }
1366}
1367
1368fn http_decode_err(msg: String) -> Value {
1369    let inner = Value::Variant {
1370        name: "DecodeError".into(),
1371        args: vec![Value::Str(msg)],
1372    };
1373    Value::Variant { name: "Err".into(), args: vec![inner] }
1374}
1375
1376/// Run a request and pack the ureq response into the
1377/// `{ status, headers, body }` Lex record (or the structured
1378/// `HttpError` on failure). `headers_extra` pairs are appended to the
1379/// outgoing request after `content_type` is applied.
1380fn http_send_simple(
1381    method: &str,
1382    url: &str,
1383    body: Option<Vec<u8>>,
1384    content_type: &str,
1385    timeout_ms: Option<u64>,
1386) -> Value {
1387    http_send_full(method, url, body, content_type, &[], timeout_ms)
1388}
1389
1390fn http_send_full(
1391    method: &str,
1392    url: &str,
1393    body: Option<Vec<u8>>,
1394    content_type: &str,
1395    headers: &[(String, String)],
1396    timeout_ms: Option<u64>,
1397) -> Value {
1398    let agent = http_agent(timeout_ms);
1399    let resp = match method {
1400        "GET" => {
1401            let mut req = agent.get(url);
1402            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1403            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1404            req.call()
1405        }
1406        "POST" => {
1407            let body = body.unwrap_or_default();
1408            let mut req = agent.post(url);
1409            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1410            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1411            req.send(&body[..])
1412        }
1413        m => {
1414            // Other methods (PUT, DELETE, PATCH, ...) fall through
1415            // here in v1.5; for now surface a structured DecodeError
1416            // so the caller can match it.
1417            return http_decode_err(format!("unsupported method: {m}"));
1418        }
1419    };
1420    match resp {
1421        Ok(mut r) => {
1422            let status = r.status().as_u16() as i64;
1423            let headers_map = collect_response_headers(r.headers());
1424            let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1425                Ok(b) => b,
1426                Err(e) => return http_decode_err(format!("body read: {e}")),
1427            };
1428            let mut rec = indexmap::IndexMap::new();
1429            rec.insert("status".into(), Value::Int(status));
1430            rec.insert("headers".into(), Value::Map(headers_map));
1431            rec.insert("body".into(), Value::Bytes(body_bytes));
1432            Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1433        }
1434        Err(e) => http_error_value(e),
1435    }
1436}
1437
1438fn collect_response_headers(
1439    headers: &ureq::http::HeaderMap,
1440) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1441    let mut out = std::collections::BTreeMap::new();
1442    for (name, value) in headers.iter() {
1443        let v = value.to_str().unwrap_or("").to_string();
1444        out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1445    }
1446    out
1447}
1448
1449/// Pull the standard `HttpRequest` shape out of a `Value::Record`
1450/// and dispatch through `http_send_full`. The handler verifies
1451/// `--allow-net-host` for the URL before sending.
1452fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1453    let method = match req.get("method") {
1454        Some(Value::Str(s)) => s.clone(),
1455        _ => return http_decode_err("HttpRequest.method must be Str".into()),
1456    };
1457    let url = match req.get("url") {
1458        Some(Value::Str(s)) => s.clone(),
1459        _ => return http_decode_err("HttpRequest.url must be Str".into()),
1460    };
1461    if let Err(e) = handler.ensure_host_allowed(&url) {
1462        return http_decode_err(e);
1463    }
1464    let body = match req.get("body") {
1465        Some(Value::Variant { name, args }) if name == "None" => None,
1466        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1467            [Value::Bytes(b)] => Some(b.clone()),
1468            _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1469        },
1470        _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1471    };
1472    let timeout_ms = match req.get("timeout_ms") {
1473        Some(Value::Variant { name, .. }) if name == "None" => None,
1474        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1475            [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1476            _ => return http_decode_err(
1477                "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1478        },
1479        _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1480    };
1481    let headers: Vec<(String, String)> = match req.get("headers") {
1482        Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1483            let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1484            let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1485            Some((kk, vv))
1486        }).collect(),
1487        _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1488    };
1489    http_send_full(&method, &url, body, "", &headers, timeout_ms)
1490}
1491
1492fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1493    match v {
1494        Some(Value::Record(r)) => Ok(r),
1495        Some(other) => Err(format!("expected Record, got {other:?}")),
1496        None => Err("missing Record argument".into()),
1497    }
1498}
1499
1500fn err_value(msg: String) -> Value {
1501    Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1502}
1503
1504fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1505    match v {
1506        Some(Value::Str(s)) => Ok(s),
1507        Some(other) => Err(format!("expected Str arg, got {other:?}")),
1508        None => Err("missing argument".into()),
1509    }
1510}
1511
1512fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1513    match v {
1514        Some(Value::Int(n)) => Ok(*n),
1515        Some(other) => Err(format!("expected Int arg, got {other:?}")),
1516        None => Err("missing argument".into()),
1517    }
1518}
1519
1520fn ok(v: Value) -> Value {
1521    Value::Variant { name: "Ok".into(), args: vec![v] }
1522}
1523fn err(v: Value) -> Value {
1524    Value::Variant { name: "Err".into(), args: vec![v] }
1525}
1526
1527impl DefaultHandler {
1528    /// Implementation of `agent.call_mcp(server, tool, args_json)`.
1529    /// Goes through the LRU client cache (#197): the named server
1530    /// is spawned on first use and reused on subsequent calls.
1531    /// On failure the offending client is dropped so the next
1532    /// call respawns rather than silently failing forever.
1533    fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1534        let server = match args.first() {
1535            Some(Value::Str(s)) => s.clone(),
1536            _ => return err(Value::Str(
1537                "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1538        };
1539        let tool = match args.get(1) {
1540            Some(Value::Str(s)) => s.clone(),
1541            _ => return err(Value::Str(
1542                "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1543        };
1544        let args_json = match args.get(2) {
1545            Some(Value::Str(s)) => s.clone(),
1546            _ => return err(Value::Str(
1547                "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1548        };
1549        let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1550            Ok(v) => v,
1551            Err(e) => return err(Value::Str(format!(
1552                "agent.call_mcp: args_json is not valid JSON: {e}"))),
1553        };
1554        match self.mcp_clients.call(&server, &tool, parsed) {
1555            Ok(result) => ok(Value::Str(
1556                serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1557            Err(e) => err(Value::Str(e)),
1558        }
1559    }
1560
1561    /// Implementation of `agent.cloud_stream(prompt) -> Result[Stream[Str], Str]`
1562    /// (#305 slice 3). The fixture path (`LEX_LLM_STREAM_FIXTURE`)
1563    /// splits the env-var value on `|` and yields each segment as
1564    /// one chunk; it's the load-bearing test hook. Live HTTP
1565    /// chunked-response support is deferred to a follow-up slice.
1566    fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
1567        let _prompt = match args.first() {
1568            Some(Value::Str(s)) => s.clone(),
1569            _ => return err(Value::Str(
1570                "agent.cloud_stream(prompt): prompt must be Str".into())),
1571        };
1572        let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
1573            Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
1574            Err(_) => return err(Value::Str(
1575                "agent.cloud_stream: live streaming not yet implemented; \
1576                 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
1577        };
1578        let handle = self.register_stream(chunks.into_iter());
1579        ok(stream_handle_value(handle))
1580    }
1581
1582    /// Implementation of `stream.next(s) -> Option[T]` (#305 slice 3).
1583    /// Returns `Some(chunk)` for each producer yield and `None` once
1584    /// the producer is exhausted. Unknown handle ids return `None`
1585    /// rather than erroring so streams can be safely consumed past
1586    /// the end (matches the semantics of `Iterator::next`).
1587    fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
1588        let handle = match args.first().and_then(stream_handle_id) {
1589            Some(h) => h,
1590            None => return Value::Variant { name: "None".into(), args: vec![] },
1591        };
1592        let mut streams = match self.streams.lock() {
1593            Ok(g) => g,
1594            Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
1595        };
1596        match streams.get_mut(&handle).and_then(|it| it.next()) {
1597            Some(chunk) => some(Value::Str(chunk)),
1598            None => {
1599                streams.remove(&handle);
1600                Value::Variant { name: "None".into(), args: vec![] }
1601            }
1602        }
1603    }
1604
1605    /// Implementation of `stream.collect(s) -> List[T]` (#305 slice 3).
1606    /// Drains the producer eagerly. Unknown handles drain to an
1607    /// empty list so the contract is `collect ∘ collect = []`
1608    /// (idempotent on a closed stream).
1609    fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
1610        let handle = match args.first().and_then(stream_handle_id) {
1611            Some(h) => h,
1612            None => return Value::List(Vec::new()),
1613        };
1614        let mut iter = {
1615            let mut streams = match self.streams.lock() {
1616                Ok(g) => g,
1617                Err(_) => return Value::List(Vec::new()),
1618            };
1619            match streams.remove(&handle) {
1620                Some(it) => it,
1621                None => return Value::List(Vec::new()),
1622            }
1623        };
1624        let mut out: Vec<Value> = Vec::new();
1625        for chunk in iter.by_ref() {
1626            out.push(Value::Str(chunk));
1627        }
1628        Value::List(out)
1629    }
1630
1631    /// Register a producer iterator and return its handle id. The
1632    /// handle is monotonic-counter-based so two streams created in
1633    /// quick succession get distinct ids.
1634    fn register_stream<I>(&self, iter: I) -> String
1635    where
1636        I: Iterator<Item = String> + Send + 'static,
1637    {
1638        let id = self
1639            .next_stream_id
1640            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1641        let handle = format!("stream_{id}");
1642        if let Ok(mut streams) = self.streams.lock() {
1643            streams.insert(handle.clone(), Box::new(iter));
1644        }
1645        handle
1646    }
1647}
1648
1649/// Build the runtime representation of a `Stream[T]` value:
1650/// `Variant("__StreamHandle", [Str(handle_id)])`. The opaque tag is
1651/// prefixed with `__` so it can't collide with a user-declared
1652/// variant.
1653fn stream_handle_value(handle: String) -> Value {
1654    Value::Variant {
1655        name: "__StreamHandle".into(),
1656        args: vec![Value::Str(handle)],
1657    }
1658}
1659
1660/// Inverse of [`stream_handle_value`] — extract the handle id from
1661/// a Stream value, or `None` if the input doesn't have the
1662/// expected shape.
1663fn stream_handle_id(v: &Value) -> Option<String> {
1664    match v {
1665        Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
1666            Some(Value::Str(h)) => Some(h.clone()),
1667            _ => None,
1668        },
1669        _ => None,
1670    }
1671}
1672
1673/// Implementation of `agent.local_complete(prompt)` (#196).
1674/// Hits Ollama (or any compatible HTTP service via `OLLAMA_HOST`)
1675/// and returns the completion text. Override at the
1676/// `EffectHandler` layer if you need a different transport.
1677fn dispatch_llm_local(args: Vec<Value>) -> Value {
1678    let prompt = match args.first() {
1679        Some(Value::Str(s)) => s.clone(),
1680        _ => return err(Value::Str(
1681            "agent.local_complete(prompt): prompt must be Str".into())),
1682    };
1683    match crate::llm::local_complete(&prompt) {
1684        Ok(text) => ok(Value::Str(text)),
1685        Err(e) => err(Value::Str(e)),
1686    }
1687}
1688
1689/// Implementation of `agent.cloud_complete(prompt)` (#196).
1690/// Hits OpenAI's chat-completions API (or any compatible
1691/// service via `OPENAI_BASE_URL`) and returns the assistant
1692/// message. Requires `OPENAI_API_KEY`. Override at the
1693/// `EffectHandler` layer for custom auth, batching, or other
1694/// providers.
1695fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1696    let prompt = match args.first() {
1697        Some(Value::Str(s)) => s.clone(),
1698        _ => return err(Value::Str(
1699            "agent.cloud_complete(prompt): prompt must be Str".into())),
1700    };
1701    match crate::llm::cloud_complete(&prompt) {
1702        Ok(text) => ok(Value::Str(text)),
1703        Err(e) => err(Value::Str(e)),
1704    }
1705}
1706
1707fn some(v: Value) -> Value {
1708    Value::Variant { name: "Some".into(), args: vec![v] }
1709}
1710fn none() -> Value {
1711    Value::Variant { name: "None".into(), args: vec![] }
1712}
1713
1714fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1715    match v {
1716        Some(Value::Bytes(b)) => Ok(b),
1717        Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1718        None => Err("missing argument".into()),
1719    }
1720}
1721
1722fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1723    match v {
1724        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1725        Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1726        None => Err("missing Kv argument".into()),
1727    }
1728}
1729
1730fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1731    match v {
1732        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1733        Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1734        None => Err("missing Db argument".into()),
1735    }
1736}
1737
1738fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1739    match v {
1740        Some(Value::List(items)) => items.iter().map(|x| match x {
1741            Value::Str(s) => Ok(s.clone()),
1742            other => Err(format!("expected List[Str] element, got {other:?}")),
1743        }).collect(),
1744        Some(other) => Err(format!("expected List[Str], got {other:?}")),
1745        None => Err("missing List[Str] argument".into()),
1746    }
1747}
1748
1749/// Run a `SELECT` (or any returning statement) and pack the rows
1750/// into `Value::List(Value::Record(...))` shape — column-name keys,
1751/// SQLite-typed values mapped one-for-one to Lex value variants
1752/// (Null → Unit, Integer → Int, Real → Float, Text → Str, Blob →
1753/// Bytes). Returns the standard `Result[List[T], Str]` Lex shape.
1754fn sql_run_query(
1755    conn: &rusqlite::Connection,
1756    stmt_str: &str,
1757    params: &[String],
1758) -> Value {
1759    let mut stmt = match conn.prepare(stmt_str) {
1760        Ok(s)  => s,
1761        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1762    };
1763    let column_count = stmt.column_count();
1764    let column_names: Vec<String> = (0..column_count)
1765        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1766        .collect();
1767    let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1768        .map(|s| s as &dyn rusqlite::ToSql)
1769        .collect();
1770    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1771        Ok(r)  => r,
1772        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1773    };
1774    let mut out: Vec<Value> = Vec::new();
1775    loop {
1776        let row = match rows.next() {
1777            Ok(Some(r)) => r,
1778            Ok(None)    => break,
1779            Err(e)      => return err(Value::Str(format!("sql.query: {e}"))),
1780        };
1781        let mut rec = indexmap::IndexMap::new();
1782        for (i, name) in column_names.iter().enumerate() {
1783            let cell = match row.get_ref(i) {
1784                Ok(c)  => sql_value_ref_to_lex(c),
1785                Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1786            };
1787            rec.insert(name.clone(), cell);
1788        }
1789        out.push(Value::Record(rec));
1790    }
1791    ok(Value::List(out))
1792}
1793
1794fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1795    use rusqlite::types::ValueRef;
1796    match v {
1797        ValueRef::Null       => Value::Unit,
1798        ValueRef::Integer(n) => Value::Int(n),
1799        ValueRef::Real(f)    => Value::Float(f),
1800        ValueRef::Text(s)    => Value::Str(String::from_utf8_lossy(s).into_owned()),
1801        ValueRef::Blob(b)    => Value::Bytes(b.to_vec()),
1802    }
1803}
1804
1805// -- log state (process-wide; configurable via log.set_*) --
1806
1807#[derive(Clone, Copy, PartialEq, PartialOrd)]
1808enum LogLevel { Debug, Info, Warn, Error }
1809
1810#[derive(Clone, Copy, PartialEq)]
1811enum LogFormat { Text, Json }
1812
1813#[derive(Clone)]
1814enum LogSink {
1815    Stderr,
1816    File(std::sync::Arc<Mutex<std::fs::File>>),
1817}
1818
1819struct LogState {
1820    level: LogLevel,
1821    format: LogFormat,
1822    sink: LogSink,
1823}
1824
1825fn log_state() -> &'static Mutex<LogState> {
1826    static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1827    STATE.get_or_init(|| Mutex::new(LogState {
1828        level: LogLevel::Info,
1829        format: LogFormat::Text,
1830        sink: LogSink::Stderr,
1831    }))
1832}
1833
1834fn parse_log_level(s: &str) -> Option<LogLevel> {
1835    match s {
1836        "debug" => Some(LogLevel::Debug),
1837        "info" => Some(LogLevel::Info),
1838        "warn" => Some(LogLevel::Warn),
1839        "error" => Some(LogLevel::Error),
1840        _ => None,
1841    }
1842}
1843
1844fn level_label(l: LogLevel) -> &'static str {
1845    match l {
1846        LogLevel::Debug => "debug",
1847        LogLevel::Info => "info",
1848        LogLevel::Warn => "warn",
1849        LogLevel::Error => "error",
1850    }
1851}
1852
1853fn emit_log(level: LogLevel, msg: &str) {
1854    let state = log_state().lock().unwrap();
1855    if level < state.level {
1856        return;
1857    }
1858    let ts = chrono::Utc::now().to_rfc3339();
1859    let line = match state.format {
1860        LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1861        LogFormat::Json => {
1862            // Hand-rolled JSON to avoid pulling serde_json into the
1863            // hot path; msg gets minimal escaping (the four common
1864            // cases that break a JSON line).
1865            let escaped = msg
1866                .replace('\\', "\\\\")
1867                .replace('"',  "\\\"")
1868                .replace('\n', "\\n")
1869                .replace('\r', "\\r");
1870            format!(
1871                "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1872                level_label(level),
1873            )
1874        }
1875    };
1876    let sink = state.sink.clone();
1877    drop(state);
1878    match sink {
1879        LogSink::Stderr => {
1880            use std::io::Write;
1881            let _ = std::io::stderr().write_all(line.as_bytes());
1882        }
1883        LogSink::File(f) => {
1884            use std::io::Write;
1885            if let Ok(mut g) = f.lock() {
1886                let _ = g.write_all(line.as_bytes());
1887            }
1888        }
1889    }
1890}
1891
1892pub(crate) struct ProcessState {
1893    child: std::process::Child,
1894    stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1895    stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1896}
1897
1898/// Process-wide registry of live `process.spawn` handles. Capped at
1899/// [`MAX_PROCESS_HANDLES`] to bound long-running programs that spawn
1900/// many short-lived children: on each `spawn` past the cap, the
1901/// least-recently-used entry is dropped (which `Drop`s its
1902/// `ProcessState`, leaving the child orphaned but the registry
1903/// bounded). `process.wait` also drops the entry on completion since
1904/// the handle becomes terminal once the child exits.
1905///
1906/// Each entry is wrapped in `Arc<Mutex<ProcessState>>` so the global
1907/// lookup mutex is held only briefly during dispatch — once we have
1908/// the per-handle `Arc`, the global lock is released and the slow
1909/// op (`wait`, `read_*_line`) only contends on its own handle's
1910/// mutex. Reads on different handles no longer block each other.
1911fn process_registry() -> &'static Mutex<ProcessRegistry> {
1912    static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1913    REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1914}
1915
1916const MAX_PROCESS_HANDLES: usize = 256;
1917
1918type SharedProcessState = Arc<Mutex<ProcessState>>;
1919
1920pub(crate) struct ProcessRegistry {
1921    entries: indexmap::IndexMap<u64, SharedProcessState>,
1922    cap: usize,
1923}
1924
1925impl ProcessRegistry {
1926    pub(crate) fn with_capacity(cap: usize) -> Self {
1927        Self { entries: indexmap::IndexMap::new(), cap }
1928    }
1929
1930    /// Insert a freshly-spawned child. If at cap, evict the LRU entry
1931    /// first; the dropped `ProcessState`'s child stays alive (orphaned)
1932    /// but its file descriptors are released.
1933    pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1934        if self.entries.len() >= self.cap {
1935            self.entries.shift_remove_index(0);
1936        }
1937        self.entries.insert(handle, Arc::new(Mutex::new(state)));
1938    }
1939
1940    /// Look up a handle, marking it most-recently-used on hit. Returns
1941    /// a clone of the shared `Arc` — callers should release the global
1942    /// registry lock before locking the per-handle mutex.
1943    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1944        let idx = self.entries.get_index_of(&handle)?;
1945        self.entries.move_index(idx, self.entries.len() - 1);
1946        self.entries.get(&handle).cloned()
1947    }
1948
1949    /// Drop the registry entry. The underlying `Arc` may outlive the
1950    /// removal if another op still holds it; that's intentional — the
1951    /// in-flight op finishes against the existing `ProcessState`, and
1952    /// only fresh lookups start failing.
1953    pub(crate) fn remove(&mut self, handle: u64) {
1954        self.entries.shift_remove(&handle);
1955    }
1956
1957    #[cfg(test)]
1958    pub(crate) fn len(&self) -> usize { self.entries.len() }
1959}
1960
1961fn next_process_handle() -> u64 {
1962    static COUNTER: AtomicU64 = AtomicU64::new(1);
1963    COUNTER.fetch_add(1, Ordering::SeqCst)
1964}
1965
1966#[cfg(all(test, unix))]
1967mod process_registry_tests {
1968    use super::{ProcessRegistry, ProcessState};
1969
1970    /// Spawn a trivial short-lived child for use as registry payload.
1971    /// `true` exits immediately — we don't actually run the child for
1972    /// real, we just need a valid `std::process::Child`.
1973    fn fresh_state() -> ProcessState {
1974        let child = std::process::Command::new("true")
1975            .stdout(std::process::Stdio::null())
1976            .stderr(std::process::Stdio::null())
1977            .spawn()
1978            .expect("spawn `true`");
1979        ProcessState { child, stdout: None, stderr: None }
1980    }
1981
1982    #[test]
1983    fn insert_and_get_round_trip() {
1984        let mut r = ProcessRegistry::with_capacity(4);
1985        r.insert(1, fresh_state());
1986        assert!(r.touch_get(1).is_some());
1987        assert!(r.touch_get(2).is_none());
1988    }
1989
1990    #[test]
1991    fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1992        let mut r = ProcessRegistry::with_capacity(4);
1993        r.insert(1, fresh_state());
1994        r.insert(2, fresh_state());
1995        let a = r.touch_get(1).unwrap();
1996        let b = r.touch_get(2).unwrap();
1997        // Different Arcs — pointer-equality check.
1998        assert!(!std::sync::Arc::ptr_eq(&a, &b));
1999    }
2000
2001    #[test]
2002    fn cap_evicts_lru_on_overflow() {
2003        let mut r = ProcessRegistry::with_capacity(2);
2004        r.insert(1, fresh_state());
2005        r.insert(2, fresh_state());
2006        let _ = r.touch_get(1);
2007        r.insert(3, fresh_state());
2008        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2009        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2010        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2011        assert_eq!(r.len(), 2);
2012    }
2013
2014    #[test]
2015    fn cap_with_no_touches_evicts_in_insertion_order() {
2016        let mut r = ProcessRegistry::with_capacity(2);
2017        r.insert(10, fresh_state());
2018        r.insert(20, fresh_state());
2019        r.insert(30, fresh_state());
2020        assert!(r.touch_get(10).is_none());
2021        assert!(r.touch_get(20).is_some());
2022        assert!(r.touch_get(30).is_some());
2023    }
2024
2025    #[test]
2026    fn remove_drops_entry() {
2027        let mut r = ProcessRegistry::with_capacity(4);
2028        r.insert(1, fresh_state());
2029        r.remove(1);
2030        assert!(r.touch_get(1).is_none());
2031        assert_eq!(r.len(), 0);
2032    }
2033
2034    #[test]
2035    fn many_inserts_stay_bounded_at_cap() {
2036        let cap = 8;
2037        let mut r = ProcessRegistry::with_capacity(cap);
2038        for i in 0..(cap as u64 * 3) {
2039            r.insert(i, fresh_state());
2040            assert!(r.len() <= cap);
2041        }
2042        assert_eq!(r.len(), cap);
2043    }
2044
2045    #[test]
2046    fn outstanding_arc_outlives_remove() {
2047        // Holding the per-handle Arc while another op removes the
2048        // entry must not invalidate the in-flight op. Mirrors the
2049        // wait-completes-then-removes pattern.
2050        let mut r = ProcessRegistry::with_capacity(4);
2051        r.insert(1, fresh_state());
2052        let arc = r.touch_get(1).expect("entry exists");
2053        r.remove(1);
2054        // Registry forgot about it, but the Arc still works.
2055        assert!(r.touch_get(1).is_none());
2056        let _state = arc.lock().unwrap();
2057    }
2058}
2059
2060fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
2061    match v {
2062        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2063        Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
2064        None => Err("missing ProcessHandle argument".into()),
2065    }
2066}
2067
2068/// Process-wide registry of open `Kv` handles. Each `kv.open` allocates
2069/// a new u64 handle via [`next_kv_handle`] and stores the `sled::Db`
2070/// here; subsequent ops fetch by handle. `kv.close` removes the entry.
2071///
2072/// Capped at [`MAX_KV_HANDLES`] to prevent leaks from long-running
2073/// programs that open many short-lived stores without calling
2074/// `kv.close`. On insert at cap, the least-recently-used entry is
2075/// dropped (closing its `sled::Db`); subsequent ops on the evicted
2076/// handle return the standard "closed or unknown Kv handle" error.
2077/// Any access (`get`, `put`, `delete`, `contains`, `list_prefix`)
2078/// touches the LRU order.
2079fn kv_registry() -> &'static Mutex<KvRegistry> {
2080    static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
2081    REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
2082}
2083
2084/// Maximum number of `kv.open` handles kept alive at once. Past this
2085/// cap, the least-recently-used handle is evicted on each new open.
2086/// Sized so that pathological "open and forget" programs are bounded
2087/// without breaking real-world programs that intentionally keep one or
2088/// two long-lived stores open.
2089const MAX_KV_HANDLES: usize = 256;
2090
2091/// LRU-bounded set of open `sled::Db` instances keyed by `u64` handle.
2092/// Built on `IndexMap` for O(1) insert / remove / lookup with
2093/// insertion-order traversal — touching an entry just shift-moves it
2094/// to the back, evictions pop from the front.
2095pub(crate) struct KvRegistry {
2096    entries: indexmap::IndexMap<u64, sled::Db>,
2097    cap: usize,
2098}
2099
2100impl KvRegistry {
2101    pub(crate) fn with_capacity(cap: usize) -> Self {
2102        Self { entries: indexmap::IndexMap::new(), cap }
2103    }
2104
2105    /// Insert a freshly-opened db. If we're already at cap, evict the
2106    /// LRU entry first; the dropped `sled::Db` closes its files.
2107    pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
2108        if self.entries.len() >= self.cap {
2109            self.entries.shift_remove_index(0);
2110        }
2111        self.entries.insert(handle, db);
2112    }
2113
2114    /// Look up a handle, marking it most-recently-used on hit.
2115    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
2116        let idx = self.entries.get_index_of(&handle)?;
2117        self.entries.move_index(idx, self.entries.len() - 1);
2118        self.entries.get(&handle)
2119    }
2120
2121    /// Explicit `kv.close`: drop the handle if present.
2122    pub(crate) fn remove(&mut self, handle: u64) {
2123        self.entries.shift_remove(&handle);
2124    }
2125
2126    #[cfg(test)]
2127    pub(crate) fn len(&self) -> usize { self.entries.len() }
2128}
2129
2130fn next_kv_handle() -> u64 {
2131    static COUNTER: AtomicU64 = AtomicU64::new(1);
2132    COUNTER.fetch_add(1, Ordering::SeqCst)
2133}
2134
2135/// Process-wide registry of open `Db` handles. Same shape as the kv
2136/// and process registries: per-handle `Arc<Mutex<…>>` so dispatch
2137/// only briefly holds the global lock and ops on different
2138/// connections don't serialize. LRU-bounded at
2139/// [`MAX_SQL_HANDLES`] to avoid leaks from long-running programs
2140/// that open many short-lived databases.
2141fn sql_registry() -> &'static Mutex<SqlRegistry> {
2142    static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
2143    REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
2144}
2145
2146const MAX_SQL_HANDLES: usize = 256;
2147
2148type SharedConn = Arc<Mutex<rusqlite::Connection>>;
2149
2150pub(crate) struct SqlRegistry {
2151    entries: indexmap::IndexMap<u64, SharedConn>,
2152    cap: usize,
2153}
2154
2155impl SqlRegistry {
2156    pub(crate) fn with_capacity(cap: usize) -> Self {
2157        Self { entries: indexmap::IndexMap::new(), cap }
2158    }
2159
2160    pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
2161        if self.entries.len() >= self.cap {
2162            self.entries.shift_remove_index(0);
2163        }
2164        self.entries.insert(handle, Arc::new(Mutex::new(conn)));
2165    }
2166
2167    /// Look up a handle, marking it MRU on hit. Returns a clone of
2168    /// the shared `Arc` so callers release the global registry
2169    /// lock before locking the per-handle mutex.
2170    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
2171        let idx = self.entries.get_index_of(&handle)?;
2172        self.entries.move_index(idx, self.entries.len() - 1);
2173        self.entries.get(&handle).cloned()
2174    }
2175
2176    pub(crate) fn remove(&mut self, handle: u64) {
2177        self.entries.shift_remove(&handle);
2178    }
2179
2180    #[cfg(test)]
2181    pub(crate) fn len(&self) -> usize { self.entries.len() }
2182}
2183
2184fn next_sql_handle() -> u64 {
2185    static COUNTER: AtomicU64 = AtomicU64::new(1);
2186    COUNTER.fetch_add(1, Ordering::SeqCst)
2187}
2188
2189#[cfg(test)]
2190mod sql_registry_tests {
2191    use super::SqlRegistry;
2192
2193    fn fresh() -> rusqlite::Connection {
2194        rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
2195    }
2196
2197    #[test]
2198    fn insert_and_get_round_trip() {
2199        let mut r = SqlRegistry::with_capacity(4);
2200        r.insert(1, fresh());
2201        assert!(r.touch_get(1).is_some());
2202        assert!(r.touch_get(2).is_none());
2203    }
2204
2205    #[test]
2206    fn cap_evicts_lru_on_overflow() {
2207        let mut r = SqlRegistry::with_capacity(2);
2208        r.insert(1, fresh());
2209        r.insert(2, fresh());
2210        let _ = r.touch_get(1);
2211        r.insert(3, fresh());
2212        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2213        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2214        assert!(r.touch_get(3).is_some(), "3 just inserted");
2215        assert_eq!(r.len(), 2);
2216    }
2217
2218    #[test]
2219    fn remove_drops_entry() {
2220        let mut r = SqlRegistry::with_capacity(4);
2221        r.insert(1, fresh());
2222        r.remove(1);
2223        assert!(r.touch_get(1).is_none());
2224        assert_eq!(r.len(), 0);
2225    }
2226
2227    #[test]
2228    fn many_inserts_stay_bounded_at_cap() {
2229        let cap = 8;
2230        let mut r = SqlRegistry::with_capacity(cap);
2231        for i in 0..(cap as u64 * 3) {
2232            r.insert(i, fresh());
2233            assert!(r.len() <= cap);
2234        }
2235        assert_eq!(r.len(), cap);
2236    }
2237}
2238
2239#[cfg(test)]
2240mod kv_registry_tests {
2241    use super::KvRegistry;
2242
2243    /// Spin up an isolated `sled::Db` in a temp dir. Each call gets a
2244    /// unique path so concurrent tests don't collide on the lockfile.
2245    fn fresh_db(tag: &str) -> sled::Db {
2246        let dir = std::env::temp_dir().join(format!(
2247            "lex-kv-reg-{}-{}-{}",
2248            std::process::id(),
2249            tag,
2250            std::time::SystemTime::now()
2251                .duration_since(std::time::UNIX_EPOCH)
2252                .unwrap()
2253                .as_nanos()
2254        ));
2255        sled::open(&dir).expect("sled open")
2256    }
2257
2258    #[test]
2259    fn insert_and_get_round_trip() {
2260        let mut r = KvRegistry::with_capacity(4);
2261        r.insert(1, fresh_db("a"));
2262        assert!(r.touch_get(1).is_some());
2263        assert!(r.touch_get(2).is_none());
2264    }
2265
2266    #[test]
2267    fn cap_evicts_lru_on_overflow() {
2268        // cap=2: insert 1, 2; touch 1 (now MRU); insert 3 → 2 evicted.
2269        let mut r = KvRegistry::with_capacity(2);
2270        r.insert(1, fresh_db("c1"));
2271        r.insert(2, fresh_db("c2"));
2272        let _ = r.touch_get(1);
2273        r.insert(3, fresh_db("c3"));
2274        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2275        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2276        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2277        assert_eq!(r.len(), 2);
2278    }
2279
2280    #[test]
2281    fn cap_with_no_touches_evicts_in_insertion_order() {
2282        // cap=2: insert 1, 2, 3 with no touches → 1 evicted (FIFO).
2283        let mut r = KvRegistry::with_capacity(2);
2284        r.insert(10, fresh_db("f1"));
2285        r.insert(20, fresh_db("f2"));
2286        r.insert(30, fresh_db("f3"));
2287        assert!(r.touch_get(10).is_none());
2288        assert!(r.touch_get(20).is_some());
2289        assert!(r.touch_get(30).is_some());
2290    }
2291
2292    #[test]
2293    fn remove_drops_entry() {
2294        let mut r = KvRegistry::with_capacity(4);
2295        r.insert(1, fresh_db("r1"));
2296        r.remove(1);
2297        assert!(r.touch_get(1).is_none());
2298        assert_eq!(r.len(), 0);
2299    }
2300
2301    #[test]
2302    fn remove_unknown_handle_is_noop() {
2303        let mut r = KvRegistry::with_capacity(4);
2304        r.insert(1, fresh_db("u1"));
2305        r.remove(999);
2306        assert!(r.touch_get(1).is_some());
2307    }
2308
2309    #[test]
2310    fn many_inserts_stay_bounded_at_cap() {
2311        // Exhaust the cap to confirm the registry never grows past it,
2312        // even under sustained churn.
2313        let cap = 8;
2314        let mut r = KvRegistry::with_capacity(cap);
2315        for i in 0..(cap as u64 * 3) {
2316            r.insert(i, fresh_db(&format!("b{i}")));
2317            assert!(r.len() <= cap);
2318        }
2319        assert_eq!(r.len(), cap);
2320    }
2321}