Skip to main content

lex_runtime/
handler.rs

1//! Native effect handlers, dispatched at runtime through the VM's
2//! `EffectHandler` trait. The handler also re-checks the runtime policy
3//! per spec §7.4 (the static check is necessary but not sufficient: a fn
4//! declared `[fs_read("/data")]` that's allowed at startup still has to
5//! pass the path check at the point of dispatch).
6
7use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18/// Output sink used by `io.print`. Tests inject a buffer; production prints
19/// to stdout.
20pub trait IoSink: Send {
21    fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26    fn print_line(&mut self, s: &str) {
27        println!("{s}");
28    }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34    fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37/// `agent.cloud_stream` registry: per-handle producer iterators
38/// keyed by opaque handle id (#305 slice 3).
39pub type StreamRegistry =
40    std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43    policy: Policy,
44    pub sink: Box<dyn IoSink>,
45    /// Optional read root for `io.read` — when set, `io.read("p")` resolves
46    /// to `read_root.join(p)`. Lets tests run without touching the real fs.
47    pub read_root: Option<PathBuf>,
48    /// Per-run budget pool (#225). `Arc<AtomicU64>` so parallel
49    /// branches share one counter without locking. Initialized to
50    /// the policy ceiling at handler construction; each call to a
51    /// function with declared `[budget(N)]` deducts N atomically
52    /// via `note_call_budget`. Cloning the handler is intentional
53    /// for net.serve / chat handlers — they share the same pool.
54    pub budget_remaining: Arc<AtomicU64>,
55    /// The original ceiling that `budget_remaining` started at, kept
56    /// for diagnostics so a `BudgetExceeded` error can report
57    /// `(used, ceiling)` rather than just "exceeded by N".
58    pub budget_ceiling: Option<u64>,
59    /// Shared reference to the program, needed by `net.serve` so the
60    /// handler can spin up fresh VMs to dispatch incoming requests.
61    /// `None` if the handler was constructed without a program.
62    pub program: Option<Arc<Program>>,
63    /// Chat registry; populated by `net.serve_ws`'s per-message
64    /// dispatch so `chat.broadcast` / `chat.send` work from inside
65    /// a handler invocation.
66    pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67    /// LRU cache of `agent.call_mcp` clients keyed by the
68    /// command-line string (#197). Avoids spawn-per-call cost
69    /// when an agent invokes the same MCP server in tight loops.
70    /// Capped — when the cache is full, the least-recently-used
71    /// entry is dropped (its subprocess is reaped on Drop).
72    pub mcp_clients: crate::mcp_client::McpClientCache,
73    /// Stream registry for `agent.cloud_stream` / `stream.next` /
74    /// `stream.collect` (#305 slice 3). Keyed by an opaque handle
75    /// id; values are the producer iterators. Wrapped in
76    /// `Arc<Mutex<…>>` so par_map workers can share the same
77    /// stream pool (when slice-2's per-worker handler split chains
78    /// the registry through).
79    pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80    /// Monotonic counter for handing out fresh stream handle ids.
81    pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85    pub fn new(policy: Policy) -> Self {
86        // If the caller supplied a ceiling, the pool starts at that
87        // ceiling and counts down. No ceiling = `u64::MAX` so calls
88        // never refuse on budget grounds (existing behavior).
89        let ceiling = policy.budget;
90        let initial = ceiling.unwrap_or(u64::MAX);
91        Self {
92            policy,
93            sink: Box::new(StdoutSink),
94            read_root: None,
95            budget_remaining: Arc::new(AtomicU64::new(initial)),
96            budget_ceiling: ceiling,
97            program: None,
98            chat_registry: None,
99            mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100            streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101            next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102        }
103    }
104
105    pub fn with_program(mut self, program: Arc<Program>) -> Self {
106        self.program = Some(program); self
107    }
108
109    pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110        self.chat_registry = Some(registry); self
111    }
112
113    pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114        self.sink = sink; self
115    }
116
117    pub fn with_read_root(mut self, root: PathBuf) -> Self {
118        self.read_root = Some(root); self
119    }
120
121    fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122        if self.policy.allow_effects.contains(kind) {
123            Ok(())
124        } else {
125            Err(format!("effect `{kind}` not in --allow-effects"))
126        }
127    }
128
129    fn resolve_read_path(&self, p: &str) -> PathBuf {
130        match &self.read_root {
131            Some(root) => root.join(p.trim_start_matches('/')),
132            None => PathBuf::from(p),
133        }
134    }
135
136    fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137        match op {
138            "debug" | "info" | "warn" | "error" => {
139                let msg = expect_str(args.first())?;
140                let level = match op {
141                    "debug" => LogLevel::Debug,
142                    "info" => LogLevel::Info,
143                    "warn" => LogLevel::Warn,
144                    _ => LogLevel::Error,
145                };
146                emit_log(level, msg);
147                Ok(Value::Unit)
148            }
149            "set_level" => {
150                let s = expect_str(args.first())?;
151                match parse_log_level(s) {
152                    Some(l) => {
153                        log_state().lock().unwrap().level = l;
154                        Ok(ok(Value::Unit))
155                    }
156                    None => Ok(err(Value::Str(format!(
157                        "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158                }
159            }
160            "set_format" => {
161                let s = expect_str(args.first())?;
162                let fmt = match s {
163                    "text" => LogFormat::Text,
164                    "json" => LogFormat::Json,
165                    other => return Ok(err(Value::Str(format!(
166                        "log.set_format: unknown format `{other}`; expected text|json")))),
167                };
168                log_state().lock().unwrap().format = fmt;
169                Ok(ok(Value::Unit))
170            }
171            "set_sink" => {
172                let path = expect_str(args.first())?;
173                if path == "-" {
174                    log_state().lock().unwrap().sink = LogSink::Stderr;
175                    return Ok(ok(Value::Unit));
176                }
177                if let Err(e) = self.ensure_fs_write_path(path) {
178                    return Ok(err(Value::Str(e)));
179                }
180                match std::fs::OpenOptions::new()
181                    .create(true).append(true).open(path)
182                {
183                    Ok(f) => {
184                        log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185                        Ok(ok(Value::Unit))
186                    }
187                    Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188                }
189            }
190            other => Err(format!("unsupported log.{other}")),
191        }
192    }
193
194    fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195        match op {
196            "spawn" => {
197                let cmd = expect_str(args.first())?.to_string();
198                let raw_args = match args.get(1) {
199                    Some(Value::List(items)) => items.clone(),
200                    _ => return Err("process.spawn: args must be List[Str]".into()),
201                };
202                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203                    Value::Str(s) => Ok(s.clone()),
204                    other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205                }).collect();
206                let str_args = str_args?;
207                let opts = match args.get(2) {
208                    Some(Value::Record(r)) => r.clone(),
209                    _ => return Err("process.spawn: missing or invalid opts record".into()),
210                };
211
212                // Allow-list check, mirroring the existing proc.spawn.
213                if !self.policy.allow_proc.is_empty() {
214                    let basename = std::path::Path::new(&cmd)
215                        .file_name()
216                        .and_then(|s| s.to_str())
217                        .unwrap_or(&cmd);
218                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
219                        return Ok(err(Value::Str(format!(
220                            "process.spawn: `{cmd}` not in --allow-proc {:?}",
221                            self.policy.allow_proc
222                        ))));
223                    }
224                }
225
226                let mut command = std::process::Command::new(&cmd);
227                command.args(&str_args);
228                command.stdin(std::process::Stdio::piped());
229                command.stdout(std::process::Stdio::piped());
230                command.stderr(std::process::Stdio::piped());
231
232                if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233                    if name == "Some" {
234                        if let Some(Value::Str(s)) = vargs.first() {
235                            command.current_dir(s);
236                        }
237                    }
238                }
239                if let Some(Value::Map(env)) = opts.get("env") {
240                    for (k, v) in env {
241                        if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242                            command.env(ks, vs);
243                        }
244                    }
245                }
246
247                let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248                    Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249                        match vargs.first() {
250                            Some(Value::Bytes(b)) => Some(b.clone()),
251                            _ => None,
252                        }
253                    }
254                    _ => None,
255                };
256
257                let mut child = match command.spawn() {
258                    Ok(c) => c,
259                    Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260                };
261
262                if let Some(payload) = stdin_payload {
263                    if let Some(mut stdin) = child.stdin.take() {
264                        use std::io::Write;
265                        let _ = stdin.write_all(&payload);
266                        // Drop closes stdin; the child sees EOF.
267                    }
268                }
269
270                let stdout = child.stdout.take().map(std::io::BufReader::new);
271                let stderr = child.stderr.take().map(std::io::BufReader::new);
272                let handle = next_process_handle();
273                process_registry().lock().unwrap().insert(handle, ProcessState {
274                    child,
275                    stdout,
276                    stderr,
277                });
278                Ok(ok(Value::Int(handle as i64)))
279            }
280            "read_stdout_line" => Self::read_line_op(args, true),
281            "read_stderr_line" => Self::read_line_op(args, false),
282            "wait" => {
283                let h = expect_process_handle(args.first())?;
284                // Look up the per-handle Arc, then release the global
285                // lock before the (slow) wait so unrelated handles
286                // can dispatch concurrently.
287                let arc = process_registry().lock().unwrap()
288                    .touch_get(h)
289                    .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290                let status = {
291                    let mut state = arc.lock().unwrap();
292                    state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293                };
294                // Wait completion makes the handle terminal; drop it
295                // from the registry so the cap doesn't fill up with
296                // exited children.
297                process_registry().lock().unwrap().remove(h);
298                let mut rec = indexmap::IndexMap::new();
299                rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300                #[cfg(unix)]
301                {
302                    use std::os::unix::process::ExitStatusExt;
303                    rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304                }
305                #[cfg(not(unix))]
306                {
307                    rec.insert("signaled".into(), Value::Bool(false));
308                }
309                Ok(Value::Record(rec))
310            }
311            "kill" => {
312                let h = expect_process_handle(args.first())?;
313                let _signal = expect_str(args.get(1))?;
314                let arc = process_registry().lock().unwrap()
315                    .touch_get(h)
316                    .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317                let mut state = arc.lock().unwrap();
318                // Cross-platform: only `kill` (SIGKILL-equivalent on
319                // Windows). Signal-name dispatch is a v1.5 follow-up.
320                match state.child.kill() {
321                    Ok(_) => Ok(ok(Value::Unit)),
322                    Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323                }
324            }
325            "run" => {
326                let cmd = expect_str(args.first())?.to_string();
327                let raw_args = match args.get(1) {
328                    Some(Value::List(items)) => items.clone(),
329                    _ => return Err("process.run: args must be List[Str]".into()),
330                };
331                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332                    Value::Str(s) => Ok(s.clone()),
333                    other => Err(format!("process.run: arg must be Str, got {other:?}")),
334                }).collect();
335                let str_args = str_args?;
336                if !self.policy.allow_proc.is_empty() {
337                    let basename = std::path::Path::new(&cmd)
338                        .file_name()
339                        .and_then(|s| s.to_str())
340                        .unwrap_or(&cmd);
341                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
342                        return Ok(err(Value::Str(format!(
343                            "process.run: `{cmd}` not in --allow-proc {:?}",
344                            self.policy.allow_proc
345                        ))));
346                    }
347                }
348                match std::process::Command::new(&cmd).args(&str_args).output() {
349                    Ok(o) => {
350                        let mut rec = indexmap::IndexMap::new();
351                        rec.insert("stdout".into(), Value::Str(
352                            String::from_utf8_lossy(&o.stdout).to_string()));
353                        rec.insert("stderr".into(), Value::Str(
354                            String::from_utf8_lossy(&o.stderr).to_string()));
355                        rec.insert("exit_code".into(), Value::Int(
356                            o.status.code().unwrap_or(-1) as i64));
357                        Ok(ok(Value::Record(rec)))
358                    }
359                    Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360                }
361            }
362            other => Err(format!("unsupported process.{other}")),
363        }
364    }
365
366    /// Read one line from the child's stdout (`is_stdout = true`) or
367    /// stderr. Returns `None` (Lex `Option`) at EOF; subsequent calls
368    /// keep returning `None`. Holds only the per-handle mutex during
369    /// the (potentially blocking) read, so reads on one handle don't
370    /// block reads/waits on a different handle.
371    fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372        let h = expect_process_handle(args.first())?;
373        let arc = process_registry().lock().unwrap()
374            .touch_get(h)
375            .ok_or_else(|| format!(
376                "process.read_{}_line: closed or unknown ProcessHandle",
377                if is_stdout { "stdout" } else { "stderr" }))?;
378        let mut state = arc.lock().unwrap();
379        let reader_opt = if is_stdout {
380            state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381        } else {
382            state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383        };
384        let reader = match reader_opt {
385            Some(r) => r,
386            None => return Ok(none()),
387        };
388        let mut line = String::new();
389        match reader.read_line(&mut line) {
390            Ok(0) => Ok(none()),
391            Ok(_) => {
392                if line.ends_with('\n') { line.pop(); }
393                if line.ends_with('\r') { line.pop(); }
394                Ok(some(Value::Str(line)))
395            }
396            Err(e) => Err(format!("process.read_*_line: {e}")),
397        }
398    }
399
400    fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401        match op {
402            "exists" => {
403                let path = expect_str(args.first())?.to_string();
404                if let Err(e) = self.ensure_fs_walk_path(&path) {
405                    return Ok(err(Value::Str(e)));
406                }
407                Ok(Value::Bool(std::path::Path::new(&path).exists()))
408            }
409            "is_file" => {
410                let path = expect_str(args.first())?.to_string();
411                if let Err(e) = self.ensure_fs_walk_path(&path) {
412                    return Ok(err(Value::Str(e)));
413                }
414                Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415            }
416            "is_dir" => {
417                let path = expect_str(args.first())?.to_string();
418                if let Err(e) = self.ensure_fs_walk_path(&path) {
419                    return Ok(err(Value::Str(e)));
420                }
421                Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422            }
423            "stat" => {
424                let path = expect_str(args.first())?.to_string();
425                if let Err(e) = self.ensure_fs_walk_path(&path) {
426                    return Ok(err(Value::Str(e)));
427                }
428                match std::fs::metadata(&path) {
429                    Ok(md) => {
430                        let mtime = md.modified()
431                            .ok()
432                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433                            .map(|d| d.as_secs() as i64)
434                            .unwrap_or(0);
435                        let mut rec = indexmap::IndexMap::new();
436                        rec.insert("size".into(), Value::Int(md.len() as i64));
437                        rec.insert("mtime".into(), Value::Int(mtime));
438                        rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439                        rec.insert("is_file".into(), Value::Bool(md.is_file()));
440                        Ok(ok(Value::Record(rec)))
441                    }
442                    Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443                }
444            }
445            "list_dir" => {
446                let path = expect_str(args.first())?.to_string();
447                if let Err(e) = self.ensure_fs_walk_path(&path) {
448                    return Ok(err(Value::Str(e)));
449                }
450                match std::fs::read_dir(&path) {
451                    Ok(rd) => {
452                        let mut entries: Vec<Value> = Vec::new();
453                        for ent in rd {
454                            match ent {
455                                Ok(e) => {
456                                    let p = e.path();
457                                    entries.push(Value::Str(p.to_string_lossy().into_owned()));
458                                }
459                                Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460                            }
461                        }
462                        Ok(ok(Value::List(entries)))
463                    }
464                    Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465                }
466            }
467            "walk" => {
468                let path = expect_str(args.first())?.to_string();
469                if let Err(e) = self.ensure_fs_walk_path(&path) {
470                    return Ok(err(Value::Str(e)));
471                }
472                let mut paths: Vec<Value> = Vec::new();
473                for ent in walkdir::WalkDir::new(&path) {
474                    match ent {
475                        Ok(e) => paths.push(Value::Str(
476                            e.path().to_string_lossy().into_owned())),
477                        Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478                    }
479                }
480                Ok(ok(Value::List(paths)))
481            }
482            "glob" => {
483                let pattern = expect_str(args.first())?.to_string();
484                // Glob patterns can't be path-scoped at parse time
485                // (`**/*.rs` doesn't pin a directory); we filter the
486                // per-result paths after expansion against
487                // `--allow-fs-read`.
488                let entries = match glob::glob(&pattern) {
489                    Ok(e) => e,
490                    Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491                };
492                let mut paths: Vec<Value> = Vec::new();
493                for ent in entries {
494                    match ent {
495                        Ok(p) => {
496                            let s = p.to_string_lossy().into_owned();
497                            if self.policy.allow_fs_read.is_empty()
498                                || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499                            {
500                                paths.push(Value::Str(s));
501                            }
502                        }
503                        Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504                    }
505                }
506                Ok(ok(Value::List(paths)))
507            }
508            "mkdir_p" => {
509                let path = expect_str(args.first())?.to_string();
510                if let Err(e) = self.ensure_fs_write_path(&path) {
511                    return Ok(err(Value::Str(e)));
512                }
513                match std::fs::create_dir_all(&path) {
514                    Ok(_) => Ok(ok(Value::Unit)),
515                    Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516                }
517            }
518            "remove" => {
519                let path = expect_str(args.first())?.to_string();
520                if let Err(e) = self.ensure_fs_write_path(&path) {
521                    return Ok(err(Value::Str(e)));
522                }
523                let p = std::path::Path::new(&path);
524                let result = if p.is_dir() {
525                    std::fs::remove_dir_all(p)
526                } else {
527                    std::fs::remove_file(p)
528                };
529                match result {
530                    Ok(_) => Ok(ok(Value::Unit)),
531                    Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532                }
533            }
534            "copy" => {
535                let src = expect_str(args.first())?.to_string();
536                let dst = expect_str(args.get(1))?.to_string();
537                if let Err(e) = self.ensure_fs_walk_path(&src) {
538                    return Ok(err(Value::Str(e)));
539                }
540                if let Err(e) = self.ensure_fs_write_path(&dst) {
541                    return Ok(err(Value::Str(e)));
542                }
543                match std::fs::copy(&src, &dst) {
544                    Ok(_) => Ok(ok(Value::Unit)),
545                    Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546                }
547            }
548            other => Err(format!("unsupported fs.{other}")),
549        }
550    }
551
552    /// Path scope for walk-style operations. `[fs_walk]` reuses the
553    /// `--allow-fs-read` allowlist — listing a directory is an
554    /// information disclosure on the same path tree as reading file
555    /// content, so the same scope applies. Empty allowlist = any path.
556    fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557        if self.policy.allow_fs_read.is_empty() {
558            return Ok(());
559        }
560        let p = std::path::Path::new(path);
561        if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562            Ok(())
563        } else {
564            Err(format!("fs path `{path}` outside --allow-fs-read"))
565        }
566    }
567
568    /// Path scope for mutating operations. `[fs_write]` uses the
569    /// existing `--allow-fs-write` allowlist.
570    fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571        if self.policy.allow_fs_write.is_empty() {
572            return Ok(());
573        }
574        let p = std::path::Path::new(path);
575        if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576            Ok(())
577        } else {
578            Err(format!("fs path `{path}` outside --allow-fs-write"))
579        }
580    }
581
582    /// Enforce `--allow-net-host` against an outgoing URL. Empty
583    /// allowlist = any host. Non-empty = the URL's host must match
584    /// (substring; port-agnostic) at least one entry.
585    fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586        if self.policy.allow_net_host.is_empty() { return Ok(()); }
587        let host = extract_host(url).unwrap_or("");
588        if self.policy.allow_net_host.iter().any(|h| host == h) {
589            Ok(())
590        } else {
591            Err(format!(
592                "net call to host `{host}` not in --allow-net-host {:?}",
593                self.policy.allow_net_host,
594            ))
595        }
596    }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600    let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601    let host_port = match rest.find('/') {
602        Some(i) => &rest[..i],
603        None => rest,
604    };
605    Some(match host_port.rsplit_once(':') {
606        Some((h, _)) => h,
607        None => host_port,
608    })
609}
610
611impl EffectHandler for DefaultHandler {
612    /// Per-call budget enforcement (#225). VM calls this before
613    /// invoking any function whose signature declares `[budget(N)]`.
614    /// The cost N is deducted atomically from the shared pool;
615    /// returning `Err` aborts the call before any frame is pushed.
616    fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617        // Skip the work entirely when no ceiling is configured —
618        // the pool is `u64::MAX` and would never trip.
619        let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620        // Compare-and-swap: speculatively subtract; if we'd
621        // underflow, return BudgetExceeded without mutating.
622        // Use SeqCst because parallel branches may race here and
623        // the relative ordering of "used so far" vs. "this call's
624        // cost" needs to be deterministic across threads.
625        loop {
626            let cur = self.budget_remaining.load(Ordering::SeqCst);
627            if cost > cur {
628                let used = ceiling.saturating_sub(cur);
629                return Err(format!(
630                    "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631            }
632            let next = cur - cost;
633            // Conservative accounting: if the CAS races and loses,
634            // re-read and try again. No refund-on-failure path.
635            if self.budget_remaining.compare_exchange(cur, next,
636                Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637                return Ok(());
638            }
639        }
640    }
641
642    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643        // Pure stdlib builtins (str, list, json, ...) bypass the policy
644        // gate — they have no observable side effects and aren't tracked
645        // by the type system as effects.
646        if let Some(r) = try_pure_builtin(kind, op, &args) {
647            return r;
648        }
649        // `std.fs` ops use the fine-grained `[fs_walk]` and `[fs_write]`
650        // effect kinds (distinct from the module name `fs`); the
651        // policy check uses the per-op kind, not the module's.
652        if kind == "process" {
653            self.ensure_kind_allowed("proc")?;
654            return self.dispatch_process(op, args);
655        }
656        if kind == "log" {
657            // Emit ops are [log]; config ops are [io] (set_sink also
658            // [fs_write]). The dispatch picks the right kind per op.
659            let effect_kind = match op {
660                "debug" | "info" | "warn" | "error" => "log",
661                "set_level" | "set_format" => "io",
662                "set_sink" => {
663                    self.ensure_kind_allowed("io")?;
664                    self.ensure_kind_allowed("fs_write")?;
665                    return self.dispatch_log(op, args);
666                }
667                other => return Err(format!("unsupported log.{other}")),
668            };
669            self.ensure_kind_allowed(effect_kind)?;
670            return self.dispatch_log(op, args);
671        }
672        if kind == "fs" {
673            let effect_kind = match op {
674                "exists" | "is_file" | "is_dir" | "stat"
675                | "list_dir" | "walk" | "glob" => "fs_walk",
676                "mkdir_p" | "remove" => "fs_write",
677                "copy" => {
678                    self.ensure_kind_allowed("fs_walk")?;
679                    self.ensure_kind_allowed("fs_write")?;
680                    return self.dispatch_fs(op, args);
681                }
682                other => return Err(format!("unsupported fs.{other}")),
683            };
684            self.ensure_kind_allowed(effect_kind)?;
685            return self.dispatch_fs(op, args);
686        }
687        // `crypto.random` is the lone effectful op in `std.crypto`. Its
688        // declared effect kind is `random` (fine-grained on purpose so
689        // `lex audit --effect random` flags every token-generating
690        // call), distinct from the `crypto` module name.
691        // datetime.now is the only effectful op in std.datetime;
692        // declared kind is `time`, matching the existing `time.now`.
693        if kind == "datetime" && op == "now" {
694            self.ensure_kind_allowed("time")?;
695            // LEX_TEST_NOW (Unix seconds) pins the clock for deterministic tests (#350).
696            if let Ok(s) = std::env::var("LEX_TEST_NOW") {
697                if let Ok(secs) = s.trim().parse::<i64>() {
698                    return Ok(Value::Int(secs.saturating_mul(1_000_000_000)));
699                }
700            }
701            let now = chrono::Utc::now();
702            let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
703            return Ok(Value::Int(nanos));
704        }
705        if kind == "crypto" && op == "random" {
706            self.ensure_kind_allowed("random")?;
707            let n = expect_int(args.first())?;
708            if !(0..=1_048_576).contains(&n) {
709                return Err("crypto.random: n must be in 0..=1048576".into());
710            }
711            use rand::{rngs::SysRng, TryRng};
712            let mut buf = vec![0u8; n as usize];
713            SysRng.try_fill_bytes(&mut buf)
714                .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
715            return Ok(Value::Bytes(buf));
716        }
717        // `std.http` wire ops (send/get/post) gate on the `net`
718        // effect kind, not the module name. This matches the
719        // declared signature (`http.get :: Str -> [net] ...`) and
720        // keeps `--allow-effects net` doing the obvious thing for
721        // both `net.*` and `http.*` callers.
722        // `std.agent` (#184): the four runtime effects added for
723        // agent-style programs (`llm_local`, `llm_cloud`, `a2a`,
724        // `mcp`). The handlers are stubs — they enforce the
725        // declared-effect gate, return a sentinel `Ok` so traces
726        // record the call, and defer the real wire formats to
727        // downstream crates (`soft-agent` for `llm_*` and `a2a`)
728        // and #185 (MCP client wrapper).
729        if kind == "agent" {
730            let effect_kind = match op {
731                "local_complete" => "llm_local",
732                "cloud_complete" => "llm_cloud",
733                "cloud_stream"   => "llm_cloud",
734                "send_a2a"       => "a2a",
735                "call_mcp"       => "mcp",
736                other => return Err(format!("unsupported agent.{other}")),
737            };
738            self.ensure_kind_allowed(effect_kind)?;
739            // `call_mcp` runs through the LRU client cache
740            // (#197). `local_complete` / `cloud_complete` hit
741            // Ollama / OpenAI via env-var-driven configuration
742            // (#196); custom backends override at the
743            // EffectHandler layer rather than via a config file.
744            // `send_a2a` keeps its stub — that wire format
745            // lives in downstream `soft-a2a`.
746            return match op {
747                "call_mcp"       => Ok(self.dispatch_call_mcp(args)),
748                "local_complete" => Ok(dispatch_llm_local(args)),
749                "cloud_complete" => Ok(dispatch_llm_cloud(args)),
750                "cloud_stream"   => Ok(self.dispatch_cloud_stream(args)),
751                _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
752            };
753        }
754        if kind == "stream" {
755            // #305 slice 3: consumer-side stream operations. Each
756            // op resolves the opaque handle in the parent handler's
757            // stream registry and pulls one or all items. The
758            // `stream` effect must be granted by policy; default
759            // policies for agent runs grant it alongside the
760            // producer effect (e.g. `llm_cloud`).
761            self.ensure_kind_allowed("stream")?;
762            return match op {
763                "next"    => Ok(self.dispatch_stream_next(args)),
764                "collect" => Ok(self.dispatch_stream_collect(args)),
765                other => Err(format!("unsupported stream.{other}")),
766            };
767        }
768        if kind == "http" && matches!(op, "send" | "get" | "post") {
769            self.ensure_kind_allowed("net")?;
770            return match op {
771                "send" => {
772                    let req = expect_record(args.first())?;
773                    Ok(http_send_record(self, req))
774                }
775                "get" => {
776                    let url = expect_str(args.first())?.to_string();
777                    self.ensure_host_allowed(&url)?;
778                    Ok(http_send_simple("GET", &url, None, "", None))
779                }
780                "post" => {
781                    let url = expect_str(args.first())?.to_string();
782                    let body = expect_bytes(args.get(1))?.clone();
783                    let content_type = expect_str(args.get(2))?.to_string();
784                    self.ensure_host_allowed(&url)?;
785                    Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
786                }
787                _ => unreachable!(),
788            };
789        }
790        self.ensure_kind_allowed(kind)?;
791        match (kind, op) {
792            ("io", "print") => {
793                let line = expect_str(args.first())?;
794                self.sink.print_line(line);
795                Ok(Value::Unit)
796            }
797            ("io", "read") => {
798                let path = expect_str(args.first())?.to_string();
799                let resolved = self.resolve_read_path(&path);
800                // Honor read-allowlist if any. Symmetric with io.write.
801                // The path argument is checked as-given (resolved-against-
802                // read_root for tests); a tool granted [io] cannot escape
803                // the configured prefix even though the effect itself is
804                // permitted. This is the per-path scope the bench's case
805                // #6 ("[io] granted, body reads /etc/passwd") needed.
806                if !self.policy.allow_fs_read.is_empty()
807                    && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
808                {
809                    return Err(format!("read of `{path}` outside --allow-fs-read"));
810                }
811                match std::fs::read_to_string(&resolved) {
812                    Ok(s) => Ok(ok(Value::Str(s))),
813                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
814                }
815            }
816            ("io", "write") => {
817                let path = expect_str(args.first())?.to_string();
818                let contents = expect_str(args.get(1))?.to_string();
819                // Honor write-allowlist if any.
820                if !self.policy.allow_fs_write.is_empty() {
821                    let p = std::path::Path::new(&path);
822                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
823                        return Err(format!("write to `{path}` outside --allow-fs-write"));
824                    }
825                }
826                match std::fs::write(&path, contents) {
827                    Ok(_) => Ok(ok(Value::Unit)),
828                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
829                }
830            }
831            ("time", "now") => {
832                let secs = SystemTime::now().duration_since(UNIX_EPOCH)
833                    .map_err(|e| format!("time: {e}"))?.as_secs();
834                Ok(Value::Int(secs as i64))
835            }
836            ("time", "sleep_ms") => {
837                // Block the current thread for `n` ms (#226). Used
838                // by `flow.retry_with_backoff`'s exponential delay.
839                // Negative or zero is a no-op. Bounded at 60s in the
840                // runtime to avoid pathological agent-emitted loops
841                // wedging the host — anything legitimate beyond
842                // that should use process-level scheduling, not a
843                // blocking sleep.
844                let n = expect_int(args.first())?;
845                if n > 0 {
846                    let ms = (n as u64).min(60_000);
847                    std::thread::sleep(std::time::Duration::from_millis(ms));
848                }
849                Ok(Value::Unit)
850            }
851            ("rand", "int_in") => {
852                // Deterministic stub: midpoint of [lo, hi].
853                let lo = expect_int(args.first())?;
854                let hi = expect_int(args.get(1))?;
855                Ok(Value::Int((lo + hi) / 2))
856            }
857            // `env.get` returns `Option[Str]` — `None` for unset vars.
858            // Per-var scoping (`[env(NAME)]`) arrives with #207's
859            // per-capability effect parameterization; today the flat
860            // `[env]` grants access to the entire process environment.
861            ("env", "get") => {
862                let name = expect_str(args.first())?;
863                Ok(match std::env::var(name) {
864                    Ok(v) => Value::Variant {
865                        name: "Some".into(),
866                        args: vec![Value::Str(v)],
867                    },
868                    Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
869                })
870            }
871            ("budget", _) => {
872                // Budget calls are nominally tracked here; budget itself is
873                // enforced statically in `policy::check_program`.
874                Ok(Value::Unit)
875            }
876            ("net", "get") => {
877                let url = expect_str(args.first())?.to_string();
878                self.ensure_host_allowed(&url)?;
879                Ok(http_request("GET", &url, None))
880            }
881            ("net", "post") => {
882                let url = expect_str(args.first())?.to_string();
883                let body = expect_str(args.get(1))?.to_string();
884                self.ensure_host_allowed(&url)?;
885                Ok(http_request("POST", &url, Some(&body)))
886            }
887            ("net", "serve") => {
888                let port = match args.first() {
889                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
890                    _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
891                };
892                let handler_name = expect_str(args.get(1))?.to_string();
893                let program = self.program.clone()
894                    .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
895                let policy = self.policy.clone();
896                serve_http(port, handler_name, program, policy, None)
897            }
898            ("net", "serve_fn") => {
899                let port = match args.first() {
900                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
901                    _ => return Err("net.serve_fn(port, handler): port must be Int 0..=65535".into()),
902                };
903                let closure = match args.into_iter().nth(1) {
904                    Some(c @ Value::Closure { .. }) => c,
905                    _ => return Err("net.serve_fn(port, handler): handler must be a closure".into()),
906                };
907                let program = self.program.clone()
908                    .ok_or_else(|| "net.serve_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
909                let policy = self.policy.clone();
910                serve_http_fn(port, closure, program, policy)
911            }
912            ("net", "serve_tls") => {
913                let port = match args.first() {
914                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
915                    _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
916                };
917                let cert_path = expect_str(args.get(1))?.to_string();
918                let key_path = expect_str(args.get(2))?.to_string();
919                let handler_name = expect_str(args.get(3))?.to_string();
920                let program = self.program.clone()
921                    .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
922                let policy = self.policy.clone();
923                let cert = std::fs::read(&cert_path)
924                    .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
925                let key = std::fs::read(&key_path)
926                    .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
927                serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
928            }
929            ("net", "serve_ws") => {
930                let port = match args.first() {
931                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
932                    _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
933                };
934                let handler_name = expect_str(args.get(1))?.to_string();
935                let program = self.program.clone()
936                    .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
937                let policy = self.policy.clone();
938                let registry = Arc::new(crate::ws::ChatRegistry::default());
939                crate::ws::serve_ws(port, handler_name, program, policy, registry)
940            }
941            ("net", "serve_ws_fn") => {
942                let port = match args.first() {
943                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
944                    _ => return Err("net.serve_ws_fn(port, subprotocol, handler): port must be Int 0..=65535".into()),
945                };
946                let subprotocol = expect_str(args.get(1))?.to_string();
947                let closure = match args.into_iter().nth(2) {
948                    Some(c @ Value::Closure { .. }) => c,
949                    _ => return Err("net.serve_ws_fn(port, subprotocol, handler): handler must be a closure".into()),
950                };
951                let program = self.program.clone()
952                    .ok_or_else(|| "net.serve_ws_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
953                let policy = self.policy.clone();
954                let registry = Arc::new(crate::ws::ChatRegistry::default());
955                crate::ws::serve_ws_fn(port, subprotocol, closure, program, policy, registry)
956            }
957            ("chat", "broadcast") => {
958                let registry = self.chat_registry.as_ref()
959                    .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
960                let room = expect_str(args.first())?;
961                let body = expect_str(args.get(1))?;
962                crate::ws::chat_broadcast(registry, room, body);
963                Ok(Value::Unit)
964            }
965            ("chat", "send") => {
966                let registry = self.chat_registry.as_ref()
967                    .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
968                let conn_id = match args.first() {
969                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
970                    _ => return Err("chat.send: conn_id must be non-negative Int".into()),
971                };
972                let body = expect_str(args.get(1))?;
973                Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
974            }
975            ("kv", "open") => {
976                let path = expect_str(args.first())?.to_string();
977                // Honor write-allowlist: opening a Kv writes its
978                // backing files at `path`, so the same scoping that
979                // applies to `io.write` applies here.
980                if !self.policy.allow_fs_write.is_empty() {
981                    let p = std::path::Path::new(&path);
982                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
983                        return Ok(err(Value::Str(format!(
984                            "kv.open: `{path}` outside --allow-fs-write"))));
985                    }
986                }
987                match sled::open(&path) {
988                    Ok(db) => {
989                        let handle = next_kv_handle();
990                        kv_registry().lock().unwrap().insert(handle, db);
991                        Ok(ok(Value::Int(handle as i64)))
992                    }
993                    Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
994                }
995            }
996            ("kv", "close") => {
997                let h = expect_kv_handle(args.first())?;
998                kv_registry().lock().unwrap().remove(h);
999                Ok(Value::Unit)
1000            }
1001            ("kv", "get") => {
1002                let h = expect_kv_handle(args.first())?;
1003                let key = expect_str(args.get(1))?;
1004                let mut reg = kv_registry().lock().unwrap();
1005                let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
1006                match db.get(key.as_bytes()) {
1007                    Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
1008                    Ok(None) => Ok(none()),
1009                    Err(e) => Err(format!("kv.get: {e}")),
1010                }
1011            }
1012            ("kv", "put") => {
1013                let h = expect_kv_handle(args.first())?;
1014                let key = expect_str(args.get(1))?.to_string();
1015                let val = expect_bytes(args.get(2))?.clone();
1016                let mut reg = kv_registry().lock().unwrap();
1017                let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
1018                match db.insert(key.as_bytes(), val) {
1019                    Ok(_) => Ok(ok(Value::Unit)),
1020                    Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
1021                }
1022            }
1023            ("kv", "delete") => {
1024                let h = expect_kv_handle(args.first())?;
1025                let key = expect_str(args.get(1))?;
1026                let mut reg = kv_registry().lock().unwrap();
1027                let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
1028                match db.remove(key.as_bytes()) {
1029                    Ok(_) => Ok(ok(Value::Unit)),
1030                    Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
1031                }
1032            }
1033            ("kv", "contains") => {
1034                let h = expect_kv_handle(args.first())?;
1035                let key = expect_str(args.get(1))?;
1036                let mut reg = kv_registry().lock().unwrap();
1037                let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1038                match db.contains_key(key.as_bytes()) {
1039                    Ok(present) => Ok(Value::Bool(present)),
1040                    Err(e) => Err(format!("kv.contains: {e}")),
1041                }
1042            }
1043            ("kv", "list_prefix") => {
1044                let h = expect_kv_handle(args.first())?;
1045                let prefix = expect_str(args.get(1))?;
1046                let mut reg = kv_registry().lock().unwrap();
1047                let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1048                let mut keys: Vec<Value> = Vec::new();
1049                for kv in db.scan_prefix(prefix.as_bytes()) {
1050                    let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1051                    let s = String::from_utf8_lossy(&k).to_string();
1052                    keys.push(Value::Str(s));
1053                }
1054                Ok(Value::List(keys))
1055            }
1056            ("sql", "open") => {
1057                let path = expect_str(args.first())?.to_string();
1058                if path.starts_with("postgres://") || path.starts_with("postgresql://") {
1059                    // Postgres: connect via sync driver; no fs-write policy applies.
1060                    match postgres::Client::connect(&path, postgres::NoTls) {
1061                        Ok(client) => {
1062                            let handle = next_sql_handle();
1063                            sql_registry().lock().unwrap().insert(handle, SqlConn::Postgres(client));
1064                            Ok(ok(Value::Int(handle as i64)))
1065                        }
1066                        Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1067                    }
1068                } else {
1069                    // SQLite: same shape as `kv.open`; fs-write allowlist applies
1070                    // (in-memory paths are exempt).
1071                    if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1072                        let p = std::path::Path::new(&path);
1073                        if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1074                            return Ok(err(Value::Str(format!(
1075                                "sql.open: `{path}` outside --allow-fs-write"))));
1076                        }
1077                    }
1078                    match rusqlite::Connection::open(&path) {
1079                        Ok(conn) => {
1080                            let handle = next_sql_handle();
1081                            sql_registry().lock().unwrap().insert(handle, SqlConn::Sqlite(conn));
1082                            Ok(ok(Value::Int(handle as i64)))
1083                        }
1084                        Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1085                    }
1086                }
1087            }
1088            ("sql", "close") => {
1089                let h = expect_sql_handle(args.first())?;
1090                sql_registry().lock().unwrap().remove(h);
1091                Ok(Value::Unit)
1092            }
1093            ("sql", "exec") => {
1094                let h = expect_sql_handle(args.first())?;
1095                let stmt = expect_str(args.get(1))?.to_string();
1096                let params = expect_sql_params(args.get(2))?;
1097                let arc = sql_registry().lock().unwrap()
1098                    .touch_get(h)
1099                    .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1100                let mut conn = arc.lock().unwrap();
1101                match &mut *conn {
1102                    SqlConn::Sqlite(c) => {
1103                        let bound = sqlite_params(&params);
1104                        let bind: Vec<&dyn rusqlite::ToSql> =
1105                            bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1106                        match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1107                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1108                            Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1109                        }
1110                    }
1111                    SqlConn::Postgres(c) => {
1112                        let pg = pg_param_refs(&params);
1113                        let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1114                            pg.iter().map(|b| b.as_ref()).collect();
1115                        match c.execute(stmt.as_str(), &refs) {
1116                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1117                            Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1118                        }
1119                    }
1120                }
1121            }
1122            ("sql", "query") => {
1123                let h = expect_sql_handle(args.first())?;
1124                let stmt_str = expect_str(args.get(1))?.to_string();
1125                let params = expect_sql_params(args.get(2))?;
1126                let arc = sql_registry().lock().unwrap()
1127                    .touch_get(h)
1128                    .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1129                let mut conn = arc.lock().unwrap();
1130                Ok(match &mut *conn {
1131                    SqlConn::Sqlite(c)   => sql_run_query_sqlite(c, &stmt_str, &params),
1132                    SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, &params),
1133                })
1134            }
1135            // Transactions: begin issues BEGIN SQL on the connection;
1136            // commit/rollback issue COMMIT/ROLLBACK. SqlTx reuses the
1137            // same Int handle as Db — the type system enforces correct
1138            // usage; the runtime treats both as the same registry key.
1139            ("sql", "begin") => {
1140                let h = expect_sql_handle(args.first())?;
1141                let arc = sql_registry().lock().unwrap()
1142                    .touch_get(h)
1143                    .ok_or_else(|| "sql.begin: closed or unknown Db handle".to_string())?;
1144                let mut conn = arc.lock().unwrap();
1145                let res = match &mut *conn {
1146                    SqlConn::Sqlite(c)   => c.execute_batch("BEGIN").map_err(|e| e.to_string()),
1147                    SqlConn::Postgres(c) => c.batch_execute("BEGIN").map_err(|e| e.to_string()),
1148                };
1149                match res {
1150                    Ok(()) => Ok(ok(Value::Int(h as i64))),
1151                    Err(e) => Ok(err(Value::Str(format!("sql.begin: {e}")))),
1152                }
1153            }
1154            ("sql", "commit") => {
1155                let h = expect_sql_handle(args.first())?;
1156                let arc = sql_registry().lock().unwrap()
1157                    .touch_get(h)
1158                    .ok_or_else(|| "sql.commit: closed or unknown SqlTx handle".to_string())?;
1159                let mut conn = arc.lock().unwrap();
1160                let res = match &mut *conn {
1161                    SqlConn::Sqlite(c)   => c.execute_batch("COMMIT").map_err(|e| e.to_string()),
1162                    SqlConn::Postgres(c) => c.batch_execute("COMMIT").map_err(|e| e.to_string()),
1163                };
1164                match res {
1165                    Ok(()) => Ok(ok(Value::Unit)),
1166                    Err(e) => Ok(err(Value::Str(format!("sql.commit: {e}")))),
1167                }
1168            }
1169            ("sql", "rollback") => {
1170                let h = expect_sql_handle(args.first())?;
1171                let arc = sql_registry().lock().unwrap()
1172                    .touch_get(h)
1173                    .ok_or_else(|| "sql.rollback: closed or unknown SqlTx handle".to_string())?;
1174                let mut conn = arc.lock().unwrap();
1175                let res = match &mut *conn {
1176                    SqlConn::Sqlite(c)   => c.execute_batch("ROLLBACK").map_err(|e| e.to_string()),
1177                    SqlConn::Postgres(c) => c.batch_execute("ROLLBACK").map_err(|e| e.to_string()),
1178                };
1179                match res {
1180                    Ok(()) => Ok(ok(Value::Unit)),
1181                    Err(e) => Ok(err(Value::Str(format!("sql.rollback: {e}")))),
1182                }
1183            }
1184            ("sql", "exec_tx") => {
1185                let h = expect_sql_handle(args.first())?;
1186                let stmt = expect_str(args.get(1))?.to_string();
1187                let params = expect_sql_params(args.get(2))?;
1188                let arc = sql_registry().lock().unwrap()
1189                    .touch_get(h)
1190                    .ok_or_else(|| "sql.exec_tx: closed or unknown SqlTx handle".to_string())?;
1191                let mut conn = arc.lock().unwrap();
1192                match &mut *conn {
1193                    SqlConn::Sqlite(c) => {
1194                        let bound = sqlite_params(&params);
1195                        let bind: Vec<&dyn rusqlite::ToSql> =
1196                            bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1197                        match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1198                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1199                            Err(e) => Ok(err(Value::Str(format!("sql.exec_tx: {e}")))),
1200                        }
1201                    }
1202                    SqlConn::Postgres(c) => {
1203                        let pg = pg_param_refs(&params);
1204                        let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1205                            pg.iter().map(|b| b.as_ref()).collect();
1206                        match c.execute(stmt.as_str(), &refs) {
1207                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1208                            Err(e) => Ok(err(Value::Str(format!("sql.exec_tx: {e}")))),
1209                        }
1210                    }
1211                }
1212            }
1213            ("sql", "query_tx") => {
1214                let h = expect_sql_handle(args.first())?;
1215                let stmt_str = expect_str(args.get(1))?.to_string();
1216                let params = expect_sql_params(args.get(2))?;
1217                let arc = sql_registry().lock().unwrap()
1218                    .touch_get(h)
1219                    .ok_or_else(|| "sql.query_tx: closed or unknown SqlTx handle".to_string())?;
1220                let mut conn = arc.lock().unwrap();
1221                Ok(match &mut *conn {
1222                    SqlConn::Sqlite(c)   => sql_run_query_sqlite(c, &stmt_str, &params),
1223                    SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, &params),
1224                })
1225            }
1226            ("sql", "get_str") => Ok(sql_get_col(&args, |v| match v {
1227                Value::Str(s) => Some(Value::Str(s.clone())),
1228                Value::Int(n) => Some(Value::Str(n.to_string())),
1229                _ => None,
1230            })?),
1231            ("sql", "get_int") => Ok(sql_get_col(&args, |v| match v {
1232                Value::Int(n) => Some(Value::Int(*n)),
1233                Value::Float(f) => Some(Value::Int(*f as i64)),
1234                _ => None,
1235            })?),
1236            ("sql", "get_float") => Ok(sql_get_col(&args, |v| match v {
1237                Value::Float(f) => Some(Value::Float(*f)),
1238                Value::Int(n)   => Some(Value::Float(*n as f64)),
1239                _ => None,
1240            })?),
1241            ("sql", "get_bool") => Ok(sql_get_col(&args, |v| match v {
1242                Value::Bool(b)  => Some(Value::Bool(*b)),
1243                Value::Int(n)   => Some(Value::Bool(*n != 0)),
1244                _ => None,
1245            })?),
1246            ("proc", "spawn") => {
1247                // The escape hatch effect. Spawns a child process,
1248                // collects its stdout/stderr, returns a structured
1249                // record. Allow-list is the binary basename: anything
1250                // outside `--allow-proc` is rejected pre-spawn.
1251                //
1252                // What this does NOT validate (per SECURITY.md):
1253                // - per-arg content (a script-like CLI invoked via
1254                //   --eval=... can run anything)
1255                // - environment variables (inherited from the parent)
1256                // - working directory (the parent's)
1257                //
1258                // For untrusted input, layer with OS-level
1259                // sandboxing — gVisor / nsjail / a container.
1260                let cmd = expect_str(args.first())?.to_string();
1261                let raw_args = match args.get(1) {
1262                    Some(Value::List(items)) => items,
1263                    Some(other) => return Err(format!(
1264                        "proc.spawn: args must be List[Str], got {other:?}")),
1265                    None => return Err("proc.spawn: missing args list".into()),
1266                };
1267                let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1268                    Value::Str(s) => Ok(s.clone()),
1269                    other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1270                }).collect::<Result<Vec<_>, _>>()?;
1271
1272                // Allow-list check: empty list = any binary (escape
1273                // hatch); non-empty = basename of cmd must match an
1274                // entry exactly.
1275                if !self.policy.allow_proc.is_empty() {
1276                    let basename = std::path::Path::new(&cmd)
1277                        .file_name()
1278                        .and_then(|s| s.to_str())
1279                        .unwrap_or(&cmd);
1280                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
1281                        return Ok(err(Value::Str(format!(
1282                            "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1283                            self.policy.allow_proc
1284                        ))));
1285                    }
1286                }
1287
1288                // Hard caps: the spec doesn't pin numbers, but
1289                // unbounded argv is a DoS vector.
1290                if str_args.len() > 1024 {
1291                    return Ok(err(Value::Str(
1292                        "proc.spawn: arg-count exceeds 1024".into())));
1293                }
1294                if str_args.iter().any(|a| a.len() > 65_536) {
1295                    return Ok(err(Value::Str(
1296                        "proc.spawn: per-arg length exceeds 64 KiB".into())));
1297                }
1298
1299                let output = std::process::Command::new(&cmd)
1300                    .args(&str_args)
1301                    .output();
1302                match output {
1303                    Ok(o) => {
1304                        let mut rec = indexmap::IndexMap::new();
1305                        rec.insert("stdout".into(), Value::Str(
1306                            String::from_utf8_lossy(&o.stdout).to_string()));
1307                        rec.insert("stderr".into(), Value::Str(
1308                            String::from_utf8_lossy(&o.stderr).to_string()));
1309                        rec.insert("exit_code".into(), Value::Int(
1310                            o.status.code().unwrap_or(-1) as i64));
1311                        Ok(ok(Value::Record(rec)))
1312                    }
1313                    Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1314                }
1315            }
1316            other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1317        }
1318    }
1319
1320    /// `list.par_map` worker-handler factory (#305 slice 2).
1321    ///
1322    /// Builds a fresh `DefaultHandler` per worker that shares the
1323    /// budget pool with the parent (`Arc<AtomicU64>`) so a parallel
1324    /// batch can't escape the run-wide budget ceiling. Other state
1325    /// is intentionally split per-worker:
1326    ///
1327    /// - `sink`: a `StdoutSink` per worker. Tests that capture
1328    ///   output via a `SharedSink` wrapped in `Arc<Mutex<…>>` see
1329    ///   each worker as a fresh handler. Print interleaving on
1330    ///   stdout is acceptable; tests that need ordered capture run
1331    ///   workloads serially anyway.
1332    /// - `mcp_clients`: a fresh per-worker LRU cache. The parent's
1333    ///   subprocess handles can't be shared across threads without
1334    ///   mutex-serialising every MCP call, which would defeat the
1335    ///   parallelism. Cache hit rate is sub-optimal across the
1336    ///   first call per worker; warmed caches still amortise within
1337    ///   a worker.
1338    /// - `chat_registry`: cloned `Arc<ChatRegistry>` so all workers
1339    ///   route into the same chat dispatch layer.
1340    /// - `program`: cloned `Arc<Program>` so `net.serve` (if a
1341    ///   worker invokes it) sees the same compiled program.
1342    fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1343        let mut fresh = DefaultHandler::new(self.policy.clone());
1344        // Share the budget pool atomically — slice 2's correctness
1345        // contract: parallel work counts against the same ceiling.
1346        fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1347        fresh.budget_ceiling = self.budget_ceiling;
1348        fresh.read_root = self.read_root.clone();
1349        fresh.program = self.program.clone();
1350        fresh.chat_registry = self.chat_registry.clone();
1351        // #305 slice 3: share the stream registry across workers so
1352        // a stream produced on one thread (or the parent) is
1353        // consumable on any other. The registry is already
1354        // `Arc<Mutex<…>>` so concurrent access is safe.
1355        fresh.streams = std::sync::Arc::clone(&self.streams);
1356        fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1357        Some(Box::new(fresh))
1358    }
1359}
1360
1361/// Blocks the calling thread, accepts incoming HTTP requests on
1362/// `127.0.0.1:port`, and dispatches each through the named Lex
1363/// stage. Each request gets a fresh `Vm`; the program and policy
1364/// are shared.
1365///
1366/// Handler signature in Lex (by convention):
1367///   fn <name>(req :: Record { method :: Str, path :: Str, body :: Str })
1368///        -> Record { status :: Int, body :: Str }
1369/// PEM-encoded certificate + private key, both as raw bytes.
1370pub struct TlsConfig {
1371    pub cert: Vec<u8>,
1372    pub key: Vec<u8>,
1373}
1374
1375fn serve_http(
1376    port: u16,
1377    handler_name: String,
1378    program: Arc<Program>,
1379    policy: Policy,
1380    tls: Option<TlsConfig>,
1381) -> Result<Value, String> {
1382    let (server, scheme) = match tls {
1383        None => (
1384            tiny_http::Server::http(("127.0.0.1", port))
1385                .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1386            "http",
1387        ),
1388        Some(cfg) => {
1389            let ssl = tiny_http::SslConfig {
1390                certificate: cfg.cert,
1391                private_key: cfg.key,
1392            };
1393            (
1394                tiny_http::Server::https(("127.0.0.1", port), ssl)
1395                    .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1396                "https",
1397            )
1398        }
1399    };
1400    eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1401    // Thread-per-request: the main loop accepts; each request runs on
1402    // its own worker thread with its own fresh Vm. The Program is
1403    // shared via Arc; Policy and handler_name are cloned per request.
1404    // Lex's immutability means there's no shared mutable state at the
1405    // language level — workers don't race.
1406    for req in server.incoming_requests() {
1407        let program = Arc::clone(&program);
1408        let policy = policy.clone();
1409        let handler_name = handler_name.clone();
1410        std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1411    }
1412    Ok(Value::Unit)
1413}
1414
1415fn handle_request(
1416    mut req: tiny_http::Request,
1417    program: Arc<Program>,
1418    policy: Policy,
1419    handler_name: String,
1420) {
1421    let lex_req = build_request_value(&mut req);
1422    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1423    let mut vm = Vm::with_handler(&program, Box::new(handler));
1424    match vm.call(&handler_name, vec![lex_req]) {
1425        Ok(resp) => {
1426            let (status, body, headers) = unpack_response(&resp);
1427            let mut response = tiny_http::Response::from_string(body).with_status_code(status);
1428            for h in headers {
1429                response.add_header(h);
1430            }
1431            let _ = req.respond(response);
1432        }
1433        Err(e) => {
1434            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1435                .with_status_code(500);
1436            let _ = req.respond(response);
1437        }
1438    }
1439}
1440
1441fn serve_http_fn(
1442    port: u16,
1443    closure: Value,
1444    program: Arc<Program>,
1445    policy: Policy,
1446) -> Result<Value, String> {
1447    let server = tiny_http::Server::http(("127.0.0.1", port))
1448        .map_err(|e| format!("net.serve_fn bind {port}: {e}"))?;
1449    eprintln!("net.serve_fn: listening on http://127.0.0.1:{port}");
1450    for req in server.incoming_requests() {
1451        let program = Arc::clone(&program);
1452        let policy = policy.clone();
1453        let closure = closure.clone();
1454        std::thread::spawn(move || handle_request_fn(req, program, policy, closure));
1455    }
1456    Ok(Value::Unit)
1457}
1458
1459fn handle_request_fn(
1460    mut req: tiny_http::Request,
1461    program: Arc<Program>,
1462    policy: Policy,
1463    closure: Value,
1464) {
1465    let lex_req = build_request_value(&mut req);
1466    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1467    let mut vm = Vm::with_handler(&program, Box::new(handler));
1468    match vm.invoke_closure_value(closure, vec![lex_req]) {
1469        Ok(resp) => {
1470            let (status, body, headers) = unpack_response(&resp);
1471            let mut response = tiny_http::Response::from_string(body).with_status_code(status);
1472            for h in headers {
1473                response.add_header(h);
1474            }
1475            let _ = req.respond(response);
1476        }
1477        Err(e) => {
1478            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1479                .with_status_code(500);
1480            let _ = req.respond(response);
1481        }
1482    }
1483}
1484
1485fn build_request_value(req: &mut tiny_http::Request) -> Value {
1486    let method = format!("{:?}", req.method()).to_uppercase();
1487    let url = req.url().to_string();
1488    let (path, query) = match url.split_once('?') {
1489        Some((p, q)) => (p.to_string(), q.to_string()),
1490        None => (url, String::new()),
1491    };
1492    let mut headers_map = std::collections::BTreeMap::new();
1493    for h in req.headers() {
1494        headers_map.insert(
1495            lex_bytecode::MapKey::Str(h.field.as_str().as_str().to_ascii_lowercase()),
1496            Value::Str(h.value.as_str().to_string()),
1497        );
1498    }
1499    let mut body = String::new();
1500    let _ = req.as_reader().read_to_string(&mut body);
1501    let mut rec = indexmap::IndexMap::new();
1502    rec.insert("method".into(), Value::Str(method));
1503    rec.insert("path".into(), Value::Str(path));
1504    rec.insert("query".into(), Value::Str(query));
1505    rec.insert("body".into(), Value::Str(body));
1506    rec.insert("headers".into(), Value::Map(headers_map));
1507    Value::Record(rec)
1508}
1509
1510fn unpack_response(v: &Value) -> (u16, String, Vec<tiny_http::Header>) {
1511    if let Value::Record(rec) = v {
1512        let status = rec.get("status").and_then(|s| match s {
1513            Value::Int(n) => Some(*n as u16),
1514            _ => None,
1515        }).unwrap_or(200);
1516        let body = rec.get("body").and_then(|b| match b {
1517            Value::Str(s) => Some(s.clone()),
1518            _ => None,
1519        }).unwrap_or_default();
1520        let headers = if let Some(Value::Map(hmap)) = rec.get("headers") {
1521            hmap.iter().filter_map(|(k, v)| {
1522                if let (lex_bytecode::MapKey::Str(name), Value::Str(val)) = (k, v) {
1523                    format!("{name}: {val}").parse::<tiny_http::Header>().ok()
1524                } else {
1525                    None
1526                }
1527            }).collect()
1528        } else {
1529            vec![]
1530        };
1531        return (status, body, headers);
1532    }
1533    (500, format!("handler returned non-record: {v:?}"), vec![])
1534}
1535
1536/// HTTP/1.1 client backed by `ureq` + `rustls`. Accepts both
1537/// `http://` and `https://` URLs. Returns `Result[Str, Str]` as a
1538/// Lex `Value::Variant`. The earlier hand-rolled HTTP/1.0 client
1539/// was plain-TCP only — most public APIs are HTTPS, so the demo
1540/// could fetch `example.com` but not `wttr.in` or `api.github.com`.
1541fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1542    use std::time::Duration;
1543    // ureq 3 puts 4xx/5xx behind `Error::StatusCode(code)` and consumes
1544    // the response, so the body would be lost. Disabling
1545    // `http_status_as_error` lets us check the status manually and
1546    // surface `Err("status 404: <body>")` like the old code did.
1547    let agent: ureq::Agent = ureq::Agent::config_builder()
1548        .timeout_connect(Some(Duration::from_secs(10)))
1549        .timeout_recv_body(Some(Duration::from_secs(30)))
1550        .timeout_send_body(Some(Duration::from_secs(10)))
1551        .http_status_as_error(false)
1552        .build()
1553        .into();
1554    let resp = match (method, body) {
1555        ("GET", _) => agent.get(url).call(),
1556        ("POST", Some(b)) => agent.post(url).send(b),
1557        ("POST", None) => agent.post(url).send(""),
1558        (m, _) => return err_value(format!("unsupported method: {m}")),
1559    };
1560    match resp {
1561        Ok(mut r) => {
1562            let status = r.status().as_u16();
1563            let body = r.body_mut().read_to_string().unwrap_or_default();
1564            if (200..300).contains(&status) {
1565                Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1566            } else {
1567                err_value(format!("status {status}: {body}"))
1568            }
1569        }
1570        Err(e) => err_value(format!("transport: {e}")),
1571    }
1572}
1573
1574/// Build a ureq agent for `std.http.{send,get,post}` with the given
1575/// timeout (None → use the same defaults as the legacy `net.{get,post}`
1576/// path). Separate from `http_request` so the rich `http.send` flow
1577/// can supply per-request overrides.
1578fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1579    use std::time::Duration;
1580    let mut b = ureq::Agent::config_builder()
1581        .timeout_connect(Some(Duration::from_secs(10)))
1582        .timeout_recv_body(Some(Duration::from_secs(30)))
1583        .timeout_send_body(Some(Duration::from_secs(10)))
1584        .http_status_as_error(false);
1585    if let Some(ms) = timeout_ms {
1586        let d = Duration::from_millis(ms);
1587        b = b.timeout_global(Some(d));
1588    }
1589    b.build().into()
1590}
1591
1592/// Map ureq's transport error to the structured `HttpError` variant
1593/// std.http exposes to user code. Anything not specifically a
1594/// timeout / TLS error funnels into `NetworkError`.
1595fn http_error_value(e: ureq::Error) -> Value {
1596    let (ctor, payload): (&str, Option<String>) = match &e {
1597        ureq::Error::Timeout(_) => ("TimeoutError", None),
1598        ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1599        ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1600        ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1601        _ => ("NetworkError", Some(format!("{e}"))),
1602    };
1603    let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1604    let inner = Value::Variant { name: ctor.into(), args };
1605    Value::Variant { name: "Err".into(), args: vec![inner] }
1606}
1607
1608fn http_decode_err(msg: String) -> Value {
1609    let inner = Value::Variant {
1610        name: "DecodeError".into(),
1611        args: vec![Value::Str(msg)],
1612    };
1613    Value::Variant { name: "Err".into(), args: vec![inner] }
1614}
1615
1616/// Run a request and pack the ureq response into the
1617/// `{ status, headers, body }` Lex record (or the structured
1618/// `HttpError` on failure). `headers_extra` pairs are appended to the
1619/// outgoing request after `content_type` is applied.
1620fn http_send_simple(
1621    method: &str,
1622    url: &str,
1623    body: Option<Vec<u8>>,
1624    content_type: &str,
1625    timeout_ms: Option<u64>,
1626) -> Value {
1627    http_send_full(method, url, body, content_type, &[], timeout_ms)
1628}
1629
1630fn http_send_full(
1631    method: &str,
1632    url: &str,
1633    body: Option<Vec<u8>>,
1634    content_type: &str,
1635    headers: &[(String, String)],
1636    timeout_ms: Option<u64>,
1637) -> Value {
1638    let agent = http_agent(timeout_ms);
1639    let resp = match method {
1640        "GET" => {
1641            let mut req = agent.get(url);
1642            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1643            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1644            req.call()
1645        }
1646        "POST" => {
1647            let body = body.unwrap_or_default();
1648            let mut req = agent.post(url);
1649            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1650            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1651            req.send(&body[..])
1652        }
1653        m => {
1654            // Other methods (PUT, DELETE, PATCH, ...) fall through
1655            // here in v1.5; for now surface a structured DecodeError
1656            // so the caller can match it.
1657            return http_decode_err(format!("unsupported method: {m}"));
1658        }
1659    };
1660    match resp {
1661        Ok(mut r) => {
1662            let status = r.status().as_u16() as i64;
1663            let headers_map = collect_response_headers(r.headers());
1664            let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1665                Ok(b) => b,
1666                Err(e) => return http_decode_err(format!("body read: {e}")),
1667            };
1668            let mut rec = indexmap::IndexMap::new();
1669            rec.insert("status".into(), Value::Int(status));
1670            rec.insert("headers".into(), Value::Map(headers_map));
1671            rec.insert("body".into(), Value::Bytes(body_bytes));
1672            Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1673        }
1674        Err(e) => http_error_value(e),
1675    }
1676}
1677
1678fn collect_response_headers(
1679    headers: &ureq::http::HeaderMap,
1680) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1681    let mut out = std::collections::BTreeMap::new();
1682    for (name, value) in headers.iter() {
1683        let v = value.to_str().unwrap_or("").to_string();
1684        out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1685    }
1686    out
1687}
1688
1689/// Pull the standard `HttpRequest` shape out of a `Value::Record`
1690/// and dispatch through `http_send_full`. The handler verifies
1691/// `--allow-net-host` for the URL before sending.
1692fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1693    let method = match req.get("method") {
1694        Some(Value::Str(s)) => s.clone(),
1695        _ => return http_decode_err("HttpRequest.method must be Str".into()),
1696    };
1697    let url = match req.get("url") {
1698        Some(Value::Str(s)) => s.clone(),
1699        _ => return http_decode_err("HttpRequest.url must be Str".into()),
1700    };
1701    if let Err(e) = handler.ensure_host_allowed(&url) {
1702        return http_decode_err(e);
1703    }
1704    let body = match req.get("body") {
1705        Some(Value::Variant { name, args }) if name == "None" => None,
1706        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1707            [Value::Bytes(b)] => Some(b.clone()),
1708            _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1709        },
1710        _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1711    };
1712    let timeout_ms = match req.get("timeout_ms") {
1713        Some(Value::Variant { name, .. }) if name == "None" => None,
1714        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1715            [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1716            _ => return http_decode_err(
1717                "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1718        },
1719        _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1720    };
1721    let headers: Vec<(String, String)> = match req.get("headers") {
1722        Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1723            let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1724            let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1725            Some((kk, vv))
1726        }).collect(),
1727        _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1728    };
1729    http_send_full(&method, &url, body, "", &headers, timeout_ms)
1730}
1731
1732fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1733    match v {
1734        Some(Value::Record(r)) => Ok(r),
1735        Some(other) => Err(format!("expected Record, got {other:?}")),
1736        None => Err("missing Record argument".into()),
1737    }
1738}
1739
1740fn err_value(msg: String) -> Value {
1741    Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1742}
1743
1744fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1745    match v {
1746        Some(Value::Str(s)) => Ok(s),
1747        Some(other) => Err(format!("expected Str arg, got {other:?}")),
1748        None => Err("missing argument".into()),
1749    }
1750}
1751
1752fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1753    match v {
1754        Some(Value::Int(n)) => Ok(*n),
1755        Some(other) => Err(format!("expected Int arg, got {other:?}")),
1756        None => Err("missing argument".into()),
1757    }
1758}
1759
1760fn ok(v: Value) -> Value {
1761    Value::Variant { name: "Ok".into(), args: vec![v] }
1762}
1763fn err(v: Value) -> Value {
1764    Value::Variant { name: "Err".into(), args: vec![v] }
1765}
1766
1767impl DefaultHandler {
1768    /// Implementation of `agent.call_mcp(server, tool, args_json)`.
1769    /// Goes through the LRU client cache (#197): the named server
1770    /// is spawned on first use and reused on subsequent calls.
1771    /// On failure the offending client is dropped so the next
1772    /// call respawns rather than silently failing forever.
1773    fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1774        let server = match args.first() {
1775            Some(Value::Str(s)) => s.clone(),
1776            _ => return err(Value::Str(
1777                "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1778        };
1779        let tool = match args.get(1) {
1780            Some(Value::Str(s)) => s.clone(),
1781            _ => return err(Value::Str(
1782                "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1783        };
1784        let args_json = match args.get(2) {
1785            Some(Value::Str(s)) => s.clone(),
1786            _ => return err(Value::Str(
1787                "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1788        };
1789        let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1790            Ok(v) => v,
1791            Err(e) => return err(Value::Str(format!(
1792                "agent.call_mcp: args_json is not valid JSON: {e}"))),
1793        };
1794        match self.mcp_clients.call(&server, &tool, parsed) {
1795            Ok(result) => ok(Value::Str(
1796                serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1797            Err(e) => err(Value::Str(e)),
1798        }
1799    }
1800
1801    /// Implementation of `agent.cloud_stream(prompt) -> Result[Stream[Str], Str]`
1802    /// (#305 slice 3). The fixture path (`LEX_LLM_STREAM_FIXTURE`)
1803    /// splits the env-var value on `|` and yields each segment as
1804    /// one chunk; it's the load-bearing test hook. Live HTTP
1805    /// chunked-response support is deferred to a follow-up slice.
1806    fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
1807        let _prompt = match args.first() {
1808            Some(Value::Str(s)) => s.clone(),
1809            _ => return err(Value::Str(
1810                "agent.cloud_stream(prompt): prompt must be Str".into())),
1811        };
1812        let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
1813            Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
1814            Err(_) => return err(Value::Str(
1815                "agent.cloud_stream: live streaming not yet implemented; \
1816                 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
1817        };
1818        let handle = self.register_stream(chunks.into_iter());
1819        ok(stream_handle_value(handle))
1820    }
1821
1822    /// Implementation of `stream.next(s) -> Option[T]` (#305 slice 3).
1823    /// Returns `Some(chunk)` for each producer yield and `None` once
1824    /// the producer is exhausted. Unknown handle ids return `None`
1825    /// rather than erroring so streams can be safely consumed past
1826    /// the end (matches the semantics of `Iterator::next`).
1827    fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
1828        let handle = match args.first().and_then(stream_handle_id) {
1829            Some(h) => h,
1830            None => return Value::Variant { name: "None".into(), args: vec![] },
1831        };
1832        let mut streams = match self.streams.lock() {
1833            Ok(g) => g,
1834            Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
1835        };
1836        match streams.get_mut(&handle).and_then(|it| it.next()) {
1837            Some(chunk) => some(Value::Str(chunk)),
1838            None => {
1839                streams.remove(&handle);
1840                Value::Variant { name: "None".into(), args: vec![] }
1841            }
1842        }
1843    }
1844
1845    /// Implementation of `stream.collect(s) -> List[T]` (#305 slice 3).
1846    /// Drains the producer eagerly. Unknown handles drain to an
1847    /// empty list so the contract is `collect ∘ collect = []`
1848    /// (idempotent on a closed stream).
1849    fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
1850        let handle = match args.first().and_then(stream_handle_id) {
1851            Some(h) => h,
1852            None => return Value::List(Vec::new()),
1853        };
1854        let mut iter = {
1855            let mut streams = match self.streams.lock() {
1856                Ok(g) => g,
1857                Err(_) => return Value::List(Vec::new()),
1858            };
1859            match streams.remove(&handle) {
1860                Some(it) => it,
1861                None => return Value::List(Vec::new()),
1862            }
1863        };
1864        let mut out: Vec<Value> = Vec::new();
1865        for chunk in iter.by_ref() {
1866            out.push(Value::Str(chunk));
1867        }
1868        Value::List(out)
1869    }
1870
1871    /// Register a producer iterator and return its handle id. The
1872    /// handle is monotonic-counter-based so two streams created in
1873    /// quick succession get distinct ids.
1874    fn register_stream<I>(&self, iter: I) -> String
1875    where
1876        I: Iterator<Item = String> + Send + 'static,
1877    {
1878        let id = self
1879            .next_stream_id
1880            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1881        let handle = format!("stream_{id}");
1882        if let Ok(mut streams) = self.streams.lock() {
1883            streams.insert(handle.clone(), Box::new(iter));
1884        }
1885        handle
1886    }
1887}
1888
1889/// Build the runtime representation of a `Stream[T]` value:
1890/// `Variant("__StreamHandle", [Str(handle_id)])`. The opaque tag is
1891/// prefixed with `__` so it can't collide with a user-declared
1892/// variant.
1893fn stream_handle_value(handle: String) -> Value {
1894    Value::Variant {
1895        name: "__StreamHandle".into(),
1896        args: vec![Value::Str(handle)],
1897    }
1898}
1899
1900/// Inverse of [`stream_handle_value`] — extract the handle id from
1901/// a Stream value, or `None` if the input doesn't have the
1902/// expected shape.
1903fn stream_handle_id(v: &Value) -> Option<String> {
1904    match v {
1905        Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
1906            Some(Value::Str(h)) => Some(h.clone()),
1907            _ => None,
1908        },
1909        _ => None,
1910    }
1911}
1912
1913/// Implementation of `agent.local_complete(prompt)` (#196).
1914/// Hits Ollama (or any compatible HTTP service via `OLLAMA_HOST`)
1915/// and returns the completion text. Override at the
1916/// `EffectHandler` layer if you need a different transport.
1917fn dispatch_llm_local(args: Vec<Value>) -> Value {
1918    let prompt = match args.first() {
1919        Some(Value::Str(s)) => s.clone(),
1920        _ => return err(Value::Str(
1921            "agent.local_complete(prompt): prompt must be Str".into())),
1922    };
1923    match crate::llm::local_complete(&prompt) {
1924        Ok(text) => ok(Value::Str(text)),
1925        Err(e) => err(Value::Str(e)),
1926    }
1927}
1928
1929/// Implementation of `agent.cloud_complete(prompt)` (#196).
1930/// Hits OpenAI's chat-completions API (or any compatible
1931/// service via `OPENAI_BASE_URL`) and returns the assistant
1932/// message. Requires `OPENAI_API_KEY`. Override at the
1933/// `EffectHandler` layer for custom auth, batching, or other
1934/// providers.
1935fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1936    let prompt = match args.first() {
1937        Some(Value::Str(s)) => s.clone(),
1938        _ => return err(Value::Str(
1939            "agent.cloud_complete(prompt): prompt must be Str".into())),
1940    };
1941    match crate::llm::cloud_complete(&prompt) {
1942        Ok(text) => ok(Value::Str(text)),
1943        Err(e) => err(Value::Str(e)),
1944    }
1945}
1946
1947fn some(v: Value) -> Value {
1948    Value::Variant { name: "Some".into(), args: vec![v] }
1949}
1950fn none() -> Value {
1951    Value::Variant { name: "None".into(), args: vec![] }
1952}
1953
1954fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1955    match v {
1956        Some(Value::Bytes(b)) => Ok(b),
1957        Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1958        None => Err("missing argument".into()),
1959    }
1960}
1961
1962fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1963    match v {
1964        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1965        Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1966        None => Err("missing Kv argument".into()),
1967    }
1968}
1969
1970fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1971    match v {
1972        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1973        Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1974        None => Err("missing Db argument".into()),
1975    }
1976}
1977
1978#[allow(dead_code)]
1979fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1980    match v {
1981        Some(Value::List(items)) => items.iter().map(|x| match x {
1982            Value::Str(s) => Ok(s.clone()),
1983            other => Err(format!("expected List[Str] element, got {other:?}")),
1984        }).collect(),
1985        Some(other) => Err(format!("expected List[Str], got {other:?}")),
1986        None => Err("missing List[Str] argument".into()),
1987    }
1988}
1989
1990/// Convert a `List[SqlParam]` value to driver-neutral `SqlParamValue`s.
1991/// SqlParam = PStr(Str) | PInt(Int) | PFloat(Float) | PBool(Bool) | PNull
1992fn expect_sql_params(v: Option<&Value>) -> Result<Vec<SqlParamValue>, String> {
1993    let items = match v {
1994        Some(Value::List(xs)) => xs,
1995        Some(other) => return Err(format!("expected List[SqlParam], got {other:?}")),
1996        None => return Err("missing params argument".into()),
1997    };
1998    items.iter().map(|item| {
1999        match item {
2000            Value::Variant { name, args } => match name.as_str() {
2001                "PStr"   => match args.first() {
2002                    Some(Value::Str(s)) => Ok(SqlParamValue::Text(s.clone())),
2003                    _ => Err("PStr requires a Str argument".into()),
2004                },
2005                "PInt"   => match args.first() {
2006                    Some(Value::Int(n)) => Ok(SqlParamValue::Integer(*n)),
2007                    _ => Err("PInt requires an Int argument".into()),
2008                },
2009                "PFloat" => match args.first() {
2010                    Some(Value::Float(f)) => Ok(SqlParamValue::Real(*f)),
2011                    _ => Err("PFloat requires a Float argument".into()),
2012                },
2013                "PBool"  => match args.first() {
2014                    Some(Value::Bool(b)) => Ok(SqlParamValue::Bool(*b)),
2015                    _ => Err("PBool requires a Bool argument".into()),
2016                },
2017                "PNull"  => Ok(SqlParamValue::Null),
2018                other    => Err(format!("unknown SqlParam constructor `{other}`")),
2019            },
2020            // Backward-compat: bare strings are accepted as PStr.
2021            Value::Str(s) => Ok(SqlParamValue::Text(s.clone())),
2022            other => Err(format!("expected SqlParam variant, got {other:?}")),
2023        }
2024    }).collect()
2025}
2026
2027/// Convert `SqlParamValue`s to rusqlite-typed values for SQLite binding.
2028fn sqlite_params(params: &[SqlParamValue]) -> Vec<rusqlite::types::Value> {
2029    params.iter().map(|p| match p {
2030        SqlParamValue::Text(s)    => rusqlite::types::Value::Text(s.clone()),
2031        SqlParamValue::Integer(n) => rusqlite::types::Value::Integer(*n),
2032        SqlParamValue::Real(f)    => rusqlite::types::Value::Real(*f),
2033        SqlParamValue::Bool(b)    => rusqlite::types::Value::Integer(*b as i64),
2034        SqlParamValue::Null       => rusqlite::types::Value::Null,
2035    }).collect()
2036}
2037
2038/// Box `SqlParamValue`s as `dyn ToSql + Sync` for Postgres binding.
2039fn pg_param_refs(params: &[SqlParamValue]) -> Vec<Box<dyn postgres::types::ToSql + Sync>> {
2040    params.iter().map(|p| -> Box<dyn postgres::types::ToSql + Sync> {
2041        match p {
2042            SqlParamValue::Text(s)    => Box::new(s.clone()),
2043            SqlParamValue::Integer(n) => Box::new(*n),
2044            SqlParamValue::Real(f)    => Box::new(*f),
2045            SqlParamValue::Bool(b)    => Box::new(*b),
2046            SqlParamValue::Null       => Box::new(Option::<String>::None),
2047        }
2048    }).collect()
2049}
2050
2051/// Run a statement on SQLite and pack rows into `Value::List(Value::Record(...))`.
2052fn sql_run_query_sqlite(
2053    conn: &rusqlite::Connection,
2054    stmt_str: &str,
2055    params: &[SqlParamValue],
2056) -> Value {
2057    let mut stmt = match conn.prepare(stmt_str) {
2058        Ok(s)  => s,
2059        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
2060    };
2061    let column_count = stmt.column_count();
2062    let column_names: Vec<String> = (0..column_count)
2063        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
2064        .collect();
2065    let bound = sqlite_params(params);
2066    let bind: Vec<&dyn rusqlite::ToSql> = bound.iter()
2067        .map(|p| p as &dyn rusqlite::ToSql)
2068        .collect();
2069    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
2070        Ok(r)  => r,
2071        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
2072    };
2073    let mut out: Vec<Value> = Vec::new();
2074    loop {
2075        let row = match rows.next() {
2076            Ok(Some(r)) => r,
2077            Ok(None)    => break,
2078            Err(e)      => return err(Value::Str(format!("sql.query: {e}"))),
2079        };
2080        let mut rec = indexmap::IndexMap::new();
2081        for (i, name) in column_names.iter().enumerate() {
2082            let cell = match row.get_ref(i) {
2083                Ok(c)  => sql_value_ref_to_lex(c),
2084                Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
2085            };
2086            rec.insert(name.clone(), cell);
2087        }
2088        out.push(Value::Record(rec));
2089    }
2090    ok(Value::List(out))
2091}
2092
2093/// Run a statement on Postgres and pack rows into `Value::List(Value::Record(...))`.
2094fn sql_run_query_pg(
2095    client: &mut postgres::Client,
2096    stmt_str: &str,
2097    params: &[SqlParamValue],
2098) -> Value {
2099    let pg = pg_param_refs(params);
2100    let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
2101        pg.iter().map(|b| b.as_ref()).collect();
2102    let rows = match client.query(stmt_str, &refs) {
2103        Ok(r)  => r,
2104        Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
2105    };
2106    let out: Vec<Value> = rows.iter().map(|row| {
2107        Value::Record(pg_row_to_lex_record(row))
2108    }).collect();
2109    ok(Value::List(out))
2110}
2111
2112/// Convert a Postgres row to a Lex record, mapping column types to Lex values.
2113fn pg_row_to_lex_record(row: &postgres::Row) -> indexmap::IndexMap<String, Value> {
2114    use postgres::types::Type;
2115    let mut rec = indexmap::IndexMap::new();
2116    for (i, col) in row.columns().iter().enumerate() {
2117        let ty = col.type_();
2118        let val = if *ty == Type::INT2 || *ty == Type::INT4 || *ty == Type::INT8 {
2119            row.get::<_, Option<i64>>(i).map(Value::Int).unwrap_or(Value::Unit)
2120        } else if *ty == Type::FLOAT4 || *ty == Type::FLOAT8 {
2121            row.get::<_, Option<f64>>(i).map(Value::Float).unwrap_or(Value::Unit)
2122        } else if *ty == Type::BOOL {
2123            row.get::<_, Option<bool>>(i).map(Value::Bool).unwrap_or(Value::Unit)
2124        } else if *ty == Type::BYTEA {
2125            row.get::<_, Option<Vec<u8>>>(i).map(Value::Bytes).unwrap_or(Value::Unit)
2126        } else {
2127            row.get::<_, Option<String>>(i).map(Value::Str).unwrap_or(Value::Unit)
2128        };
2129        rec.insert(col.name().to_string(), val);
2130    }
2131    rec
2132}
2133
2134/// Extract a column value from a row record by name, returning `Option[X]`.
2135fn sql_get_col<F>(args: &[Value], convert: F) -> Result<Value, String>
2136where
2137    F: Fn(&Value) -> Option<Value>,
2138{
2139    let row = args.first().ok_or("sql.get_*: missing row argument")?;
2140    let col = match args.get(1) {
2141        Some(Value::Str(s)) => s.as_str(),
2142        Some(other) => return Err(format!("sql.get_*: column name must be Str, got {other:?}")),
2143        None => return Err("sql.get_*: missing column name argument".into()),
2144    };
2145    let cell = match row {
2146        Value::Record(rec) => rec.get(col).cloned(),
2147        other => return Err(format!("sql.get_*: row must be a Record, got {other:?}")),
2148    };
2149    Ok(match cell.and_then(|v| convert(&v)) {
2150        Some(v) => Value::Variant { name: "Some".into(), args: vec![v] },
2151        None    => Value::Variant { name: "None".into(), args: vec![] },
2152    })
2153}
2154
2155fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
2156    use rusqlite::types::ValueRef;
2157    match v {
2158        ValueRef::Null       => Value::Unit,
2159        ValueRef::Integer(n) => Value::Int(n),
2160        ValueRef::Real(f)    => Value::Float(f),
2161        ValueRef::Text(s)    => Value::Str(String::from_utf8_lossy(s).into_owned()),
2162        ValueRef::Blob(b)    => Value::Bytes(b.to_vec()),
2163    }
2164}
2165
2166// -- log state (process-wide; configurable via log.set_*) --
2167
2168#[derive(Clone, Copy, PartialEq, PartialOrd)]
2169enum LogLevel { Debug, Info, Warn, Error }
2170
2171#[derive(Clone, Copy, PartialEq)]
2172enum LogFormat { Text, Json }
2173
2174#[derive(Clone)]
2175enum LogSink {
2176    Stderr,
2177    File(std::sync::Arc<Mutex<std::fs::File>>),
2178}
2179
2180struct LogState {
2181    level: LogLevel,
2182    format: LogFormat,
2183    sink: LogSink,
2184}
2185
2186fn log_state() -> &'static Mutex<LogState> {
2187    static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
2188    STATE.get_or_init(|| Mutex::new(LogState {
2189        level: LogLevel::Info,
2190        format: LogFormat::Text,
2191        sink: LogSink::Stderr,
2192    }))
2193}
2194
2195fn parse_log_level(s: &str) -> Option<LogLevel> {
2196    match s {
2197        "debug" => Some(LogLevel::Debug),
2198        "info" => Some(LogLevel::Info),
2199        "warn" => Some(LogLevel::Warn),
2200        "error" => Some(LogLevel::Error),
2201        _ => None,
2202    }
2203}
2204
2205fn level_label(l: LogLevel) -> &'static str {
2206    match l {
2207        LogLevel::Debug => "debug",
2208        LogLevel::Info => "info",
2209        LogLevel::Warn => "warn",
2210        LogLevel::Error => "error",
2211    }
2212}
2213
2214fn emit_log(level: LogLevel, msg: &str) {
2215    let state = log_state().lock().unwrap();
2216    if level < state.level {
2217        return;
2218    }
2219    let ts = chrono::Utc::now().to_rfc3339();
2220    let line = match state.format {
2221        LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
2222        LogFormat::Json => {
2223            // Hand-rolled JSON to avoid pulling serde_json into the
2224            // hot path; msg gets minimal escaping (the four common
2225            // cases that break a JSON line).
2226            let escaped = msg
2227                .replace('\\', "\\\\")
2228                .replace('"',  "\\\"")
2229                .replace('\n', "\\n")
2230                .replace('\r', "\\r");
2231            format!(
2232                "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
2233                level_label(level),
2234            )
2235        }
2236    };
2237    let sink = state.sink.clone();
2238    drop(state);
2239    match sink {
2240        LogSink::Stderr => {
2241            use std::io::Write;
2242            let _ = std::io::stderr().write_all(line.as_bytes());
2243        }
2244        LogSink::File(f) => {
2245            use std::io::Write;
2246            if let Ok(mut g) = f.lock() {
2247                let _ = g.write_all(line.as_bytes());
2248            }
2249        }
2250    }
2251}
2252
2253pub(crate) struct ProcessState {
2254    child: std::process::Child,
2255    stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
2256    stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
2257}
2258
2259/// Process-wide registry of live `process.spawn` handles. Capped at
2260/// [`MAX_PROCESS_HANDLES`] to bound long-running programs that spawn
2261/// many short-lived children: on each `spawn` past the cap, the
2262/// least-recently-used entry is dropped (which `Drop`s its
2263/// `ProcessState`, leaving the child orphaned but the registry
2264/// bounded). `process.wait` also drops the entry on completion since
2265/// the handle becomes terminal once the child exits.
2266///
2267/// Each entry is wrapped in `Arc<Mutex<ProcessState>>` so the global
2268/// lookup mutex is held only briefly during dispatch — once we have
2269/// the per-handle `Arc`, the global lock is released and the slow
2270/// op (`wait`, `read_*_line`) only contends on its own handle's
2271/// mutex. Reads on different handles no longer block each other.
2272fn process_registry() -> &'static Mutex<ProcessRegistry> {
2273    static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
2274    REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
2275}
2276
2277const MAX_PROCESS_HANDLES: usize = 256;
2278
2279type SharedProcessState = Arc<Mutex<ProcessState>>;
2280
2281pub(crate) struct ProcessRegistry {
2282    entries: indexmap::IndexMap<u64, SharedProcessState>,
2283    cap: usize,
2284}
2285
2286impl ProcessRegistry {
2287    pub(crate) fn with_capacity(cap: usize) -> Self {
2288        Self { entries: indexmap::IndexMap::new(), cap }
2289    }
2290
2291    /// Insert a freshly-spawned child. If at cap, evict the LRU entry
2292    /// first; the dropped `ProcessState`'s child stays alive (orphaned)
2293    /// but its file descriptors are released.
2294    pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
2295        if self.entries.len() >= self.cap {
2296            self.entries.shift_remove_index(0);
2297        }
2298        self.entries.insert(handle, Arc::new(Mutex::new(state)));
2299    }
2300
2301    /// Look up a handle, marking it most-recently-used on hit. Returns
2302    /// a clone of the shared `Arc` — callers should release the global
2303    /// registry lock before locking the per-handle mutex.
2304    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
2305        let idx = self.entries.get_index_of(&handle)?;
2306        self.entries.move_index(idx, self.entries.len() - 1);
2307        self.entries.get(&handle).cloned()
2308    }
2309
2310    /// Drop the registry entry. The underlying `Arc` may outlive the
2311    /// removal if another op still holds it; that's intentional — the
2312    /// in-flight op finishes against the existing `ProcessState`, and
2313    /// only fresh lookups start failing.
2314    pub(crate) fn remove(&mut self, handle: u64) {
2315        self.entries.shift_remove(&handle);
2316    }
2317
2318    #[cfg(test)]
2319    pub(crate) fn len(&self) -> usize { self.entries.len() }
2320}
2321
2322fn next_process_handle() -> u64 {
2323    static COUNTER: AtomicU64 = AtomicU64::new(1);
2324    COUNTER.fetch_add(1, Ordering::SeqCst)
2325}
2326
2327#[cfg(all(test, unix))]
2328mod process_registry_tests {
2329    use super::{ProcessRegistry, ProcessState};
2330
2331    /// Spawn a trivial short-lived child for use as registry payload.
2332    /// `true` exits immediately — we don't actually run the child for
2333    /// real, we just need a valid `std::process::Child`.
2334    fn fresh_state() -> ProcessState {
2335        let child = std::process::Command::new("true")
2336            .stdout(std::process::Stdio::null())
2337            .stderr(std::process::Stdio::null())
2338            .spawn()
2339            .expect("spawn `true`");
2340        ProcessState { child, stdout: None, stderr: None }
2341    }
2342
2343    #[test]
2344    fn insert_and_get_round_trip() {
2345        let mut r = ProcessRegistry::with_capacity(4);
2346        r.insert(1, fresh_state());
2347        assert!(r.touch_get(1).is_some());
2348        assert!(r.touch_get(2).is_none());
2349    }
2350
2351    #[test]
2352    fn touch_get_returns_distinct_arcs_for_distinct_handles() {
2353        let mut r = ProcessRegistry::with_capacity(4);
2354        r.insert(1, fresh_state());
2355        r.insert(2, fresh_state());
2356        let a = r.touch_get(1).unwrap();
2357        let b = r.touch_get(2).unwrap();
2358        // Different Arcs — pointer-equality check.
2359        assert!(!std::sync::Arc::ptr_eq(&a, &b));
2360    }
2361
2362    #[test]
2363    fn cap_evicts_lru_on_overflow() {
2364        let mut r = ProcessRegistry::with_capacity(2);
2365        r.insert(1, fresh_state());
2366        r.insert(2, fresh_state());
2367        let _ = r.touch_get(1);
2368        r.insert(3, fresh_state());
2369        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2370        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2371        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2372        assert_eq!(r.len(), 2);
2373    }
2374
2375    #[test]
2376    fn cap_with_no_touches_evicts_in_insertion_order() {
2377        let mut r = ProcessRegistry::with_capacity(2);
2378        r.insert(10, fresh_state());
2379        r.insert(20, fresh_state());
2380        r.insert(30, fresh_state());
2381        assert!(r.touch_get(10).is_none());
2382        assert!(r.touch_get(20).is_some());
2383        assert!(r.touch_get(30).is_some());
2384    }
2385
2386    #[test]
2387    fn remove_drops_entry() {
2388        let mut r = ProcessRegistry::with_capacity(4);
2389        r.insert(1, fresh_state());
2390        r.remove(1);
2391        assert!(r.touch_get(1).is_none());
2392        assert_eq!(r.len(), 0);
2393    }
2394
2395    #[test]
2396    fn many_inserts_stay_bounded_at_cap() {
2397        let cap = 8;
2398        let mut r = ProcessRegistry::with_capacity(cap);
2399        for i in 0..(cap as u64 * 3) {
2400            r.insert(i, fresh_state());
2401            assert!(r.len() <= cap);
2402        }
2403        assert_eq!(r.len(), cap);
2404    }
2405
2406    #[test]
2407    fn outstanding_arc_outlives_remove() {
2408        // Holding the per-handle Arc while another op removes the
2409        // entry must not invalidate the in-flight op. Mirrors the
2410        // wait-completes-then-removes pattern.
2411        let mut r = ProcessRegistry::with_capacity(4);
2412        r.insert(1, fresh_state());
2413        let arc = r.touch_get(1).expect("entry exists");
2414        r.remove(1);
2415        // Registry forgot about it, but the Arc still works.
2416        assert!(r.touch_get(1).is_none());
2417        let _state = arc.lock().unwrap();
2418    }
2419}
2420
2421fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
2422    match v {
2423        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2424        Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
2425        None => Err("missing ProcessHandle argument".into()),
2426    }
2427}
2428
2429/// Process-wide registry of open `Kv` handles. Each `kv.open` allocates
2430/// a new u64 handle via [`next_kv_handle`] and stores the `sled::Db`
2431/// here; subsequent ops fetch by handle. `kv.close` removes the entry.
2432///
2433/// Capped at [`MAX_KV_HANDLES`] to prevent leaks from long-running
2434/// programs that open many short-lived stores without calling
2435/// `kv.close`. On insert at cap, the least-recently-used entry is
2436/// dropped (closing its `sled::Db`); subsequent ops on the evicted
2437/// handle return the standard "closed or unknown Kv handle" error.
2438/// Any access (`get`, `put`, `delete`, `contains`, `list_prefix`)
2439/// touches the LRU order.
2440fn kv_registry() -> &'static Mutex<KvRegistry> {
2441    static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
2442    REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
2443}
2444
2445/// Maximum number of `kv.open` handles kept alive at once. Past this
2446/// cap, the least-recently-used handle is evicted on each new open.
2447/// Sized so that pathological "open and forget" programs are bounded
2448/// without breaking real-world programs that intentionally keep one or
2449/// two long-lived stores open.
2450const MAX_KV_HANDLES: usize = 256;
2451
2452/// LRU-bounded set of open `sled::Db` instances keyed by `u64` handle.
2453/// Built on `IndexMap` for O(1) insert / remove / lookup with
2454/// insertion-order traversal — touching an entry just shift-moves it
2455/// to the back, evictions pop from the front.
2456pub(crate) struct KvRegistry {
2457    entries: indexmap::IndexMap<u64, sled::Db>,
2458    cap: usize,
2459}
2460
2461impl KvRegistry {
2462    pub(crate) fn with_capacity(cap: usize) -> Self {
2463        Self { entries: indexmap::IndexMap::new(), cap }
2464    }
2465
2466    /// Insert a freshly-opened db. If we're already at cap, evict the
2467    /// LRU entry first; the dropped `sled::Db` closes its files.
2468    pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
2469        if self.entries.len() >= self.cap {
2470            self.entries.shift_remove_index(0);
2471        }
2472        self.entries.insert(handle, db);
2473    }
2474
2475    /// Look up a handle, marking it most-recently-used on hit.
2476    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
2477        let idx = self.entries.get_index_of(&handle)?;
2478        self.entries.move_index(idx, self.entries.len() - 1);
2479        self.entries.get(&handle)
2480    }
2481
2482    /// Explicit `kv.close`: drop the handle if present.
2483    pub(crate) fn remove(&mut self, handle: u64) {
2484        self.entries.shift_remove(&handle);
2485    }
2486
2487    #[cfg(test)]
2488    pub(crate) fn len(&self) -> usize { self.entries.len() }
2489}
2490
2491fn next_kv_handle() -> u64 {
2492    static COUNTER: AtomicU64 = AtomicU64::new(1);
2493    COUNTER.fetch_add(1, Ordering::SeqCst)
2494}
2495
2496/// Process-wide registry of open `Db` handles. Same shape as the kv
2497/// and process registries: per-handle `Arc<Mutex<…>>` so dispatch
2498/// only briefly holds the global lock and ops on different
2499/// connections don't serialize. LRU-bounded at
2500/// [`MAX_SQL_HANDLES`] to avoid leaks from long-running programs
2501/// that open many short-lived databases.
2502fn sql_registry() -> &'static Mutex<SqlRegistry> {
2503    static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
2504    REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
2505}
2506
2507const MAX_SQL_HANDLES: usize = 256;
2508
2509/// Driver-neutral SQL parameter value shared between SQLite and Postgres paths.
2510#[derive(Debug, Clone)]
2511enum SqlParamValue {
2512    Text(String),
2513    Integer(i64),
2514    Real(f64),
2515    Bool(bool),
2516    Null,
2517}
2518
2519/// Abstraction over a SQLite connection or a Postgres client.
2520pub(crate) enum SqlConn {
2521    Sqlite(rusqlite::Connection),
2522    Postgres(postgres::Client),
2523}
2524
2525type SharedConn = Arc<Mutex<SqlConn>>;
2526
2527pub(crate) struct SqlRegistry {
2528    entries: indexmap::IndexMap<u64, SharedConn>,
2529    cap: usize,
2530}
2531
2532impl SqlRegistry {
2533    pub(crate) fn with_capacity(cap: usize) -> Self {
2534        Self { entries: indexmap::IndexMap::new(), cap }
2535    }
2536
2537    pub(crate) fn insert(&mut self, handle: u64, conn: SqlConn) {
2538        if self.entries.len() >= self.cap {
2539            self.entries.shift_remove_index(0);
2540        }
2541        self.entries.insert(handle, Arc::new(Mutex::new(conn)));
2542    }
2543
2544    /// Look up a handle, marking it MRU on hit. Returns a clone of
2545    /// the shared `Arc` so callers release the global registry
2546    /// lock before locking the per-handle mutex.
2547    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
2548        let idx = self.entries.get_index_of(&handle)?;
2549        self.entries.move_index(idx, self.entries.len() - 1);
2550        self.entries.get(&handle).cloned()
2551    }
2552
2553    pub(crate) fn remove(&mut self, handle: u64) {
2554        self.entries.shift_remove(&handle);
2555    }
2556
2557    #[cfg(test)]
2558    pub(crate) fn len(&self) -> usize { self.entries.len() }
2559}
2560
2561fn next_sql_handle() -> u64 {
2562    static COUNTER: AtomicU64 = AtomicU64::new(1);
2563    COUNTER.fetch_add(1, Ordering::SeqCst)
2564}
2565
2566#[cfg(test)]
2567mod sql_registry_tests {
2568    use super::{SqlConn, SqlRegistry};
2569
2570    fn fresh() -> SqlConn {
2571        SqlConn::Sqlite(rusqlite::Connection::open_in_memory().expect("open in-memory sqlite"))
2572    }
2573
2574    #[test]
2575    fn insert_and_get_round_trip() {
2576        let mut r = SqlRegistry::with_capacity(4);
2577        r.insert(1, fresh());
2578        assert!(r.touch_get(1).is_some());
2579        assert!(r.touch_get(2).is_none());
2580    }
2581
2582    #[test]
2583    fn cap_evicts_lru_on_overflow() {
2584        let mut r = SqlRegistry::with_capacity(2);
2585        r.insert(1, fresh());
2586        r.insert(2, fresh());
2587        let _ = r.touch_get(1);
2588        r.insert(3, fresh());
2589        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2590        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2591        assert!(r.touch_get(3).is_some(), "3 just inserted");
2592        assert_eq!(r.len(), 2);
2593    }
2594
2595    #[test]
2596    fn remove_drops_entry() {
2597        let mut r = SqlRegistry::with_capacity(4);
2598        r.insert(1, fresh());
2599        r.remove(1);
2600        assert!(r.touch_get(1).is_none());
2601        assert_eq!(r.len(), 0);
2602    }
2603
2604    #[test]
2605    fn many_inserts_stay_bounded_at_cap() {
2606        let cap = 8;
2607        let mut r = SqlRegistry::with_capacity(cap);
2608        for i in 0..(cap as u64 * 3) {
2609            r.insert(i, fresh());
2610            assert!(r.len() <= cap);
2611        }
2612        assert_eq!(r.len(), cap);
2613    }
2614}
2615
2616#[cfg(test)]
2617mod kv_registry_tests {
2618    use super::KvRegistry;
2619
2620    /// Spin up an isolated `sled::Db` in a temp dir. Each call gets a
2621    /// unique path so concurrent tests don't collide on the lockfile.
2622    fn fresh_db(tag: &str) -> sled::Db {
2623        let dir = std::env::temp_dir().join(format!(
2624            "lex-kv-reg-{}-{}-{}",
2625            std::process::id(),
2626            tag,
2627            std::time::SystemTime::now()
2628                .duration_since(std::time::UNIX_EPOCH)
2629                .unwrap()
2630                .as_nanos()
2631        ));
2632        sled::open(&dir).expect("sled open")
2633    }
2634
2635    #[test]
2636    fn insert_and_get_round_trip() {
2637        let mut r = KvRegistry::with_capacity(4);
2638        r.insert(1, fresh_db("a"));
2639        assert!(r.touch_get(1).is_some());
2640        assert!(r.touch_get(2).is_none());
2641    }
2642
2643    #[test]
2644    fn cap_evicts_lru_on_overflow() {
2645        // cap=2: insert 1, 2; touch 1 (now MRU); insert 3 → 2 evicted.
2646        let mut r = KvRegistry::with_capacity(2);
2647        r.insert(1, fresh_db("c1"));
2648        r.insert(2, fresh_db("c2"));
2649        let _ = r.touch_get(1);
2650        r.insert(3, fresh_db("c3"));
2651        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2652        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2653        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2654        assert_eq!(r.len(), 2);
2655    }
2656
2657    #[test]
2658    fn cap_with_no_touches_evicts_in_insertion_order() {
2659        // cap=2: insert 1, 2, 3 with no touches → 1 evicted (FIFO).
2660        let mut r = KvRegistry::with_capacity(2);
2661        r.insert(10, fresh_db("f1"));
2662        r.insert(20, fresh_db("f2"));
2663        r.insert(30, fresh_db("f3"));
2664        assert!(r.touch_get(10).is_none());
2665        assert!(r.touch_get(20).is_some());
2666        assert!(r.touch_get(30).is_some());
2667    }
2668
2669    #[test]
2670    fn remove_drops_entry() {
2671        let mut r = KvRegistry::with_capacity(4);
2672        r.insert(1, fresh_db("r1"));
2673        r.remove(1);
2674        assert!(r.touch_get(1).is_none());
2675        assert_eq!(r.len(), 0);
2676    }
2677
2678    #[test]
2679    fn remove_unknown_handle_is_noop() {
2680        let mut r = KvRegistry::with_capacity(4);
2681        r.insert(1, fresh_db("u1"));
2682        r.remove(999);
2683        assert!(r.touch_get(1).is_some());
2684    }
2685
2686    #[test]
2687    fn many_inserts_stay_bounded_at_cap() {
2688        // Exhaust the cap to confirm the registry never grows past it,
2689        // even under sustained churn.
2690        let cap = 8;
2691        let mut r = KvRegistry::with_capacity(cap);
2692        for i in 0..(cap as u64 * 3) {
2693            r.insert(i, fresh_db(&format!("b{i}")));
2694            assert!(r.len() <= cap);
2695        }
2696        assert_eq!(r.len(), cap);
2697    }
2698}