Skip to main content

lex_runtime/
handler.rs

1//! Native effect handlers, dispatched at runtime through the VM's
2//! `EffectHandler` trait. The handler also re-checks the runtime policy
3//! per spec §7.4 (the static check is necessary but not sufficient: a fn
4//! declared `[fs_read("/data")]` that's allowed at startup still has to
5//! pass the path check at the point of dispatch).
6
7use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18/// Output sink used by `io.print`. Tests inject a buffer; production prints
19/// to stdout.
20pub trait IoSink: Send {
21    fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26    fn print_line(&mut self, s: &str) {
27        println!("{s}");
28    }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34    fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37/// `agent.cloud_stream` registry: per-handle producer iterators
38/// keyed by opaque handle id (#305 slice 3).
39pub type StreamRegistry =
40    std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43    policy: Policy,
44    pub sink: Box<dyn IoSink>,
45    /// Optional read root for `io.read` — when set, `io.read("p")` resolves
46    /// to `read_root.join(p)`. Lets tests run without touching the real fs.
47    pub read_root: Option<PathBuf>,
48    /// Per-run budget pool (#225). `Arc<AtomicU64>` so parallel
49    /// branches share one counter without locking. Initialized to
50    /// the policy ceiling at handler construction; each call to a
51    /// function with declared `[budget(N)]` deducts N atomically
52    /// via `note_call_budget`. Cloning the handler is intentional
53    /// for net.serve / chat handlers — they share the same pool.
54    pub budget_remaining: Arc<AtomicU64>,
55    /// The original ceiling that `budget_remaining` started at, kept
56    /// for diagnostics so a `BudgetExceeded` error can report
57    /// `(used, ceiling)` rather than just "exceeded by N".
58    pub budget_ceiling: Option<u64>,
59    /// Shared reference to the program, needed by `net.serve` so the
60    /// handler can spin up fresh VMs to dispatch incoming requests.
61    /// `None` if the handler was constructed without a program.
62    pub program: Option<Arc<Program>>,
63    /// Chat registry; populated by `net.serve_ws`'s per-message
64    /// dispatch so `chat.broadcast` / `chat.send` work from inside
65    /// a handler invocation.
66    pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67    /// LRU cache of `agent.call_mcp` clients keyed by the
68    /// command-line string (#197). Avoids spawn-per-call cost
69    /// when an agent invokes the same MCP server in tight loops.
70    /// Capped — when the cache is full, the least-recently-used
71    /// entry is dropped (its subprocess is reaped on Drop).
72    pub mcp_clients: crate::mcp_client::McpClientCache,
73    /// Stream registry for `agent.cloud_stream` / `stream.next` /
74    /// `stream.collect` (#305 slice 3). Keyed by an opaque handle
75    /// id; values are the producer iterators. Wrapped in
76    /// `Arc<Mutex<…>>` so par_map workers can share the same
77    /// stream pool (when slice-2's per-worker handler split chains
78    /// the registry through).
79    pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80    /// Monotonic counter for handing out fresh stream handle ids.
81    pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85    pub fn new(policy: Policy) -> Self {
86        // If the caller supplied a ceiling, the pool starts at that
87        // ceiling and counts down. No ceiling = `u64::MAX` so calls
88        // never refuse on budget grounds (existing behavior).
89        let ceiling = policy.budget;
90        let initial = ceiling.unwrap_or(u64::MAX);
91        Self {
92            policy,
93            sink: Box::new(StdoutSink),
94            read_root: None,
95            budget_remaining: Arc::new(AtomicU64::new(initial)),
96            budget_ceiling: ceiling,
97            program: None,
98            chat_registry: None,
99            mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100            streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101            next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102        }
103    }
104
105    pub fn with_program(mut self, program: Arc<Program>) -> Self {
106        self.program = Some(program); self
107    }
108
109    pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110        self.chat_registry = Some(registry); self
111    }
112
113    pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114        self.sink = sink; self
115    }
116
117    pub fn with_read_root(mut self, root: PathBuf) -> Self {
118        self.read_root = Some(root); self
119    }
120
121    fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122        if self.policy.allow_effects.contains(kind) {
123            Ok(())
124        } else {
125            Err(format!("effect `{kind}` not in --allow-effects"))
126        }
127    }
128
129    fn resolve_read_path(&self, p: &str) -> PathBuf {
130        match &self.read_root {
131            Some(root) => root.join(p.trim_start_matches('/')),
132            None => PathBuf::from(p),
133        }
134    }
135
136    fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137        match op {
138            "debug" | "info" | "warn" | "error" => {
139                let msg = expect_str(args.first())?;
140                let level = match op {
141                    "debug" => LogLevel::Debug,
142                    "info" => LogLevel::Info,
143                    "warn" => LogLevel::Warn,
144                    _ => LogLevel::Error,
145                };
146                emit_log(level, msg);
147                Ok(Value::Unit)
148            }
149            "set_level" => {
150                let s = expect_str(args.first())?;
151                match parse_log_level(s) {
152                    Some(l) => {
153                        log_state().lock().unwrap().level = l;
154                        Ok(ok(Value::Unit))
155                    }
156                    None => Ok(err(Value::Str(format!(
157                        "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158                }
159            }
160            "set_format" => {
161                let s = expect_str(args.first())?;
162                let fmt = match s {
163                    "text" => LogFormat::Text,
164                    "json" => LogFormat::Json,
165                    other => return Ok(err(Value::Str(format!(
166                        "log.set_format: unknown format `{other}`; expected text|json")))),
167                };
168                log_state().lock().unwrap().format = fmt;
169                Ok(ok(Value::Unit))
170            }
171            "set_sink" => {
172                let path = expect_str(args.first())?;
173                if path == "-" {
174                    log_state().lock().unwrap().sink = LogSink::Stderr;
175                    return Ok(ok(Value::Unit));
176                }
177                if let Err(e) = self.ensure_fs_write_path(path) {
178                    return Ok(err(Value::Str(e)));
179                }
180                match std::fs::OpenOptions::new()
181                    .create(true).append(true).open(path)
182                {
183                    Ok(f) => {
184                        log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185                        Ok(ok(Value::Unit))
186                    }
187                    Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188                }
189            }
190            other => Err(format!("unsupported log.{other}")),
191        }
192    }
193
194    fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195        match op {
196            "spawn" => {
197                let cmd = expect_str(args.first())?.to_string();
198                let raw_args = match args.get(1) {
199                    Some(Value::List(items)) => items.clone(),
200                    _ => return Err("process.spawn: args must be List[Str]".into()),
201                };
202                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203                    Value::Str(s) => Ok(s.clone()),
204                    other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205                }).collect();
206                let str_args = str_args?;
207                let opts = match args.get(2) {
208                    Some(Value::Record(r)) => r.clone(),
209                    _ => return Err("process.spawn: missing or invalid opts record".into()),
210                };
211
212                // Allow-list check, mirroring the existing proc.spawn.
213                if !self.policy.allow_proc.is_empty() {
214                    let basename = std::path::Path::new(&cmd)
215                        .file_name()
216                        .and_then(|s| s.to_str())
217                        .unwrap_or(&cmd);
218                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
219                        return Ok(err(Value::Str(format!(
220                            "process.spawn: `{cmd}` not in --allow-proc {:?}",
221                            self.policy.allow_proc
222                        ))));
223                    }
224                }
225
226                let mut command = std::process::Command::new(&cmd);
227                command.args(&str_args);
228                command.stdin(std::process::Stdio::piped());
229                command.stdout(std::process::Stdio::piped());
230                command.stderr(std::process::Stdio::piped());
231
232                if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233                    if name == "Some" {
234                        if let Some(Value::Str(s)) = vargs.first() {
235                            command.current_dir(s);
236                        }
237                    }
238                }
239                if let Some(Value::Map(env)) = opts.get("env") {
240                    for (k, v) in env {
241                        if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242                            command.env(ks, vs);
243                        }
244                    }
245                }
246
247                let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248                    Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249                        match vargs.first() {
250                            Some(Value::Bytes(b)) => Some(b.clone()),
251                            _ => None,
252                        }
253                    }
254                    _ => None,
255                };
256
257                let mut child = match command.spawn() {
258                    Ok(c) => c,
259                    Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260                };
261
262                if let Some(payload) = stdin_payload {
263                    if let Some(mut stdin) = child.stdin.take() {
264                        use std::io::Write;
265                        let _ = stdin.write_all(&payload);
266                        // Drop closes stdin; the child sees EOF.
267                    }
268                }
269
270                let stdout = child.stdout.take().map(std::io::BufReader::new);
271                let stderr = child.stderr.take().map(std::io::BufReader::new);
272                let handle = next_process_handle();
273                process_registry().lock().unwrap().insert(handle, ProcessState {
274                    child,
275                    stdout,
276                    stderr,
277                });
278                Ok(ok(Value::Int(handle as i64)))
279            }
280            "read_stdout_line" => Self::read_line_op(args, true),
281            "read_stderr_line" => Self::read_line_op(args, false),
282            "wait" => {
283                let h = expect_process_handle(args.first())?;
284                // Look up the per-handle Arc, then release the global
285                // lock before the (slow) wait so unrelated handles
286                // can dispatch concurrently.
287                let arc = process_registry().lock().unwrap()
288                    .touch_get(h)
289                    .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290                let status = {
291                    let mut state = arc.lock().unwrap();
292                    state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293                };
294                // Wait completion makes the handle terminal; drop it
295                // from the registry so the cap doesn't fill up with
296                // exited children.
297                process_registry().lock().unwrap().remove(h);
298                let mut rec = indexmap::IndexMap::new();
299                rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300                #[cfg(unix)]
301                {
302                    use std::os::unix::process::ExitStatusExt;
303                    rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304                }
305                #[cfg(not(unix))]
306                {
307                    rec.insert("signaled".into(), Value::Bool(false));
308                }
309                Ok(Value::Record(rec))
310            }
311            "kill" => {
312                let h = expect_process_handle(args.first())?;
313                let _signal = expect_str(args.get(1))?;
314                let arc = process_registry().lock().unwrap()
315                    .touch_get(h)
316                    .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317                let mut state = arc.lock().unwrap();
318                // Cross-platform: only `kill` (SIGKILL-equivalent on
319                // Windows). Signal-name dispatch is a v1.5 follow-up.
320                match state.child.kill() {
321                    Ok(_) => Ok(ok(Value::Unit)),
322                    Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323                }
324            }
325            "run" => {
326                let cmd = expect_str(args.first())?.to_string();
327                let raw_args = match args.get(1) {
328                    Some(Value::List(items)) => items.clone(),
329                    _ => return Err("process.run: args must be List[Str]".into()),
330                };
331                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332                    Value::Str(s) => Ok(s.clone()),
333                    other => Err(format!("process.run: arg must be Str, got {other:?}")),
334                }).collect();
335                let str_args = str_args?;
336                if !self.policy.allow_proc.is_empty() {
337                    let basename = std::path::Path::new(&cmd)
338                        .file_name()
339                        .and_then(|s| s.to_str())
340                        .unwrap_or(&cmd);
341                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
342                        return Ok(err(Value::Str(format!(
343                            "process.run: `{cmd}` not in --allow-proc {:?}",
344                            self.policy.allow_proc
345                        ))));
346                    }
347                }
348                match std::process::Command::new(&cmd).args(&str_args).output() {
349                    Ok(o) => {
350                        let mut rec = indexmap::IndexMap::new();
351                        rec.insert("stdout".into(), Value::Str(
352                            String::from_utf8_lossy(&o.stdout).to_string()));
353                        rec.insert("stderr".into(), Value::Str(
354                            String::from_utf8_lossy(&o.stderr).to_string()));
355                        rec.insert("exit_code".into(), Value::Int(
356                            o.status.code().unwrap_or(-1) as i64));
357                        Ok(ok(Value::Record(rec)))
358                    }
359                    Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360                }
361            }
362            other => Err(format!("unsupported process.{other}")),
363        }
364    }
365
366    /// Read one line from the child's stdout (`is_stdout = true`) or
367    /// stderr. Returns `None` (Lex `Option`) at EOF; subsequent calls
368    /// keep returning `None`. Holds only the per-handle mutex during
369    /// the (potentially blocking) read, so reads on one handle don't
370    /// block reads/waits on a different handle.
371    fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372        let h = expect_process_handle(args.first())?;
373        let arc = process_registry().lock().unwrap()
374            .touch_get(h)
375            .ok_or_else(|| format!(
376                "process.read_{}_line: closed or unknown ProcessHandle",
377                if is_stdout { "stdout" } else { "stderr" }))?;
378        let mut state = arc.lock().unwrap();
379        let reader_opt = if is_stdout {
380            state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381        } else {
382            state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383        };
384        let reader = match reader_opt {
385            Some(r) => r,
386            None => return Ok(none()),
387        };
388        let mut line = String::new();
389        match reader.read_line(&mut line) {
390            Ok(0) => Ok(none()),
391            Ok(_) => {
392                if line.ends_with('\n') { line.pop(); }
393                if line.ends_with('\r') { line.pop(); }
394                Ok(some(Value::Str(line)))
395            }
396            Err(e) => Err(format!("process.read_*_line: {e}")),
397        }
398    }
399
400    fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401        match op {
402            "exists" => {
403                let path = expect_str(args.first())?.to_string();
404                if let Err(e) = self.ensure_fs_walk_path(&path) {
405                    return Ok(err(Value::Str(e)));
406                }
407                Ok(Value::Bool(std::path::Path::new(&path).exists()))
408            }
409            "is_file" => {
410                let path = expect_str(args.first())?.to_string();
411                if let Err(e) = self.ensure_fs_walk_path(&path) {
412                    return Ok(err(Value::Str(e)));
413                }
414                Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415            }
416            "is_dir" => {
417                let path = expect_str(args.first())?.to_string();
418                if let Err(e) = self.ensure_fs_walk_path(&path) {
419                    return Ok(err(Value::Str(e)));
420                }
421                Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422            }
423            "stat" => {
424                let path = expect_str(args.first())?.to_string();
425                if let Err(e) = self.ensure_fs_walk_path(&path) {
426                    return Ok(err(Value::Str(e)));
427                }
428                match std::fs::metadata(&path) {
429                    Ok(md) => {
430                        let mtime = md.modified()
431                            .ok()
432                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433                            .map(|d| d.as_secs() as i64)
434                            .unwrap_or(0);
435                        let mut rec = indexmap::IndexMap::new();
436                        rec.insert("size".into(), Value::Int(md.len() as i64));
437                        rec.insert("mtime".into(), Value::Int(mtime));
438                        rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439                        rec.insert("is_file".into(), Value::Bool(md.is_file()));
440                        Ok(ok(Value::Record(rec)))
441                    }
442                    Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443                }
444            }
445            "list_dir" => {
446                let path = expect_str(args.first())?.to_string();
447                if let Err(e) = self.ensure_fs_walk_path(&path) {
448                    return Ok(err(Value::Str(e)));
449                }
450                match std::fs::read_dir(&path) {
451                    Ok(rd) => {
452                        let mut entries: Vec<Value> = Vec::new();
453                        for ent in rd {
454                            match ent {
455                                Ok(e) => {
456                                    let p = e.path();
457                                    entries.push(Value::Str(p.to_string_lossy().into_owned()));
458                                }
459                                Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460                            }
461                        }
462                        Ok(ok(Value::List(entries)))
463                    }
464                    Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465                }
466            }
467            "walk" => {
468                let path = expect_str(args.first())?.to_string();
469                if let Err(e) = self.ensure_fs_walk_path(&path) {
470                    return Ok(err(Value::Str(e)));
471                }
472                let mut paths: Vec<Value> = Vec::new();
473                for ent in walkdir::WalkDir::new(&path) {
474                    match ent {
475                        Ok(e) => paths.push(Value::Str(
476                            e.path().to_string_lossy().into_owned())),
477                        Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478                    }
479                }
480                Ok(ok(Value::List(paths)))
481            }
482            "glob" => {
483                let pattern = expect_str(args.first())?.to_string();
484                // Glob patterns can't be path-scoped at parse time
485                // (`**/*.rs` doesn't pin a directory); we filter the
486                // per-result paths after expansion against
487                // `--allow-fs-read`.
488                let entries = match glob::glob(&pattern) {
489                    Ok(e) => e,
490                    Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491                };
492                let mut paths: Vec<Value> = Vec::new();
493                for ent in entries {
494                    match ent {
495                        Ok(p) => {
496                            let s = p.to_string_lossy().into_owned();
497                            if self.policy.allow_fs_read.is_empty()
498                                || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499                            {
500                                paths.push(Value::Str(s));
501                            }
502                        }
503                        Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504                    }
505                }
506                Ok(ok(Value::List(paths)))
507            }
508            "mkdir_p" => {
509                let path = expect_str(args.first())?.to_string();
510                if let Err(e) = self.ensure_fs_write_path(&path) {
511                    return Ok(err(Value::Str(e)));
512                }
513                match std::fs::create_dir_all(&path) {
514                    Ok(_) => Ok(ok(Value::Unit)),
515                    Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516                }
517            }
518            "remove" => {
519                let path = expect_str(args.first())?.to_string();
520                if let Err(e) = self.ensure_fs_write_path(&path) {
521                    return Ok(err(Value::Str(e)));
522                }
523                let p = std::path::Path::new(&path);
524                let result = if p.is_dir() {
525                    std::fs::remove_dir_all(p)
526                } else {
527                    std::fs::remove_file(p)
528                };
529                match result {
530                    Ok(_) => Ok(ok(Value::Unit)),
531                    Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532                }
533            }
534            "copy" => {
535                let src = expect_str(args.first())?.to_string();
536                let dst = expect_str(args.get(1))?.to_string();
537                if let Err(e) = self.ensure_fs_walk_path(&src) {
538                    return Ok(err(Value::Str(e)));
539                }
540                if let Err(e) = self.ensure_fs_write_path(&dst) {
541                    return Ok(err(Value::Str(e)));
542                }
543                match std::fs::copy(&src, &dst) {
544                    Ok(_) => Ok(ok(Value::Unit)),
545                    Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546                }
547            }
548            other => Err(format!("unsupported fs.{other}")),
549        }
550    }
551
552    /// Path scope for walk-style operations. `[fs_walk]` reuses the
553    /// `--allow-fs-read` allowlist — listing a directory is an
554    /// information disclosure on the same path tree as reading file
555    /// content, so the same scope applies. Empty allowlist = any path.
556    fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557        if self.policy.allow_fs_read.is_empty() {
558            return Ok(());
559        }
560        let p = std::path::Path::new(path);
561        if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562            Ok(())
563        } else {
564            Err(format!("fs path `{path}` outside --allow-fs-read"))
565        }
566    }
567
568    /// Path scope for mutating operations. `[fs_write]` uses the
569    /// existing `--allow-fs-write` allowlist.
570    fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571        if self.policy.allow_fs_write.is_empty() {
572            return Ok(());
573        }
574        let p = std::path::Path::new(path);
575        if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576            Ok(())
577        } else {
578            Err(format!("fs path `{path}` outside --allow-fs-write"))
579        }
580    }
581
582    /// Enforce `--allow-net-host` against an outgoing URL. Empty
583    /// allowlist = any host. Non-empty = the URL's host must match
584    /// (substring; port-agnostic) at least one entry.
585    fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586        if self.policy.allow_net_host.is_empty() { return Ok(()); }
587        let host = extract_host(url).unwrap_or("");
588        if self.policy.allow_net_host.iter().any(|h| host == h) {
589            Ok(())
590        } else {
591            Err(format!(
592                "net call to host `{host}` not in --allow-net-host {:?}",
593                self.policy.allow_net_host,
594            ))
595        }
596    }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600    let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601    let host_port = match rest.find('/') {
602        Some(i) => &rest[..i],
603        None => rest,
604    };
605    Some(match host_port.rsplit_once(':') {
606        Some((h, _)) => h,
607        None => host_port,
608    })
609}
610
611impl EffectHandler for DefaultHandler {
612    /// Per-call budget enforcement (#225). VM calls this before
613    /// invoking any function whose signature declares `[budget(N)]`.
614    /// The cost N is deducted atomically from the shared pool;
615    /// returning `Err` aborts the call before any frame is pushed.
616    fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617        // Skip the work entirely when no ceiling is configured —
618        // the pool is `u64::MAX` and would never trip.
619        let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620        // Compare-and-swap: speculatively subtract; if we'd
621        // underflow, return BudgetExceeded without mutating.
622        // Use SeqCst because parallel branches may race here and
623        // the relative ordering of "used so far" vs. "this call's
624        // cost" needs to be deterministic across threads.
625        loop {
626            let cur = self.budget_remaining.load(Ordering::SeqCst);
627            if cost > cur {
628                let used = ceiling.saturating_sub(cur);
629                return Err(format!(
630                    "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631            }
632            let next = cur - cost;
633            // Conservative accounting: if the CAS races and loses,
634            // re-read and try again. No refund-on-failure path.
635            if self.budget_remaining.compare_exchange(cur, next,
636                Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637                return Ok(());
638            }
639        }
640    }
641
642    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643        // Pure stdlib builtins (str, list, json, ...) bypass the policy
644        // gate — they have no observable side effects and aren't tracked
645        // by the type system as effects.
646        if let Some(r) = try_pure_builtin(kind, op, &args) {
647            return r;
648        }
649        // `std.fs` ops use the fine-grained `[fs_walk]` and `[fs_write]`
650        // effect kinds (distinct from the module name `fs`); the
651        // policy check uses the per-op kind, not the module's.
652        if kind == "process" {
653            self.ensure_kind_allowed("proc")?;
654            return self.dispatch_process(op, args);
655        }
656        if kind == "log" {
657            // Emit ops are [log]; config ops are [io] (set_sink also
658            // [fs_write]). The dispatch picks the right kind per op.
659            let effect_kind = match op {
660                "debug" | "info" | "warn" | "error" => "log",
661                "set_level" | "set_format" => "io",
662                "set_sink" => {
663                    self.ensure_kind_allowed("io")?;
664                    self.ensure_kind_allowed("fs_write")?;
665                    return self.dispatch_log(op, args);
666                }
667                other => return Err(format!("unsupported log.{other}")),
668            };
669            self.ensure_kind_allowed(effect_kind)?;
670            return self.dispatch_log(op, args);
671        }
672        if kind == "fs" {
673            let effect_kind = match op {
674                "exists" | "is_file" | "is_dir" | "stat"
675                | "list_dir" | "walk" | "glob" => "fs_walk",
676                "mkdir_p" | "remove" => "fs_write",
677                "copy" => {
678                    self.ensure_kind_allowed("fs_walk")?;
679                    self.ensure_kind_allowed("fs_write")?;
680                    return self.dispatch_fs(op, args);
681                }
682                other => return Err(format!("unsupported fs.{other}")),
683            };
684            self.ensure_kind_allowed(effect_kind)?;
685            return self.dispatch_fs(op, args);
686        }
687        // `crypto.random` is the lone effectful op in `std.crypto`. Its
688        // declared effect kind is `random` (fine-grained on purpose so
689        // `lex audit --effect random` flags every token-generating
690        // call), distinct from the `crypto` module name.
691        // datetime.now is the only effectful op in std.datetime;
692        // declared kind is `time`, matching the existing `time.now`.
693        if kind == "datetime" && op == "now" {
694            self.ensure_kind_allowed("time")?;
695            // LEX_TEST_NOW (Unix seconds) pins the clock for deterministic tests (#350).
696            if let Ok(s) = std::env::var("LEX_TEST_NOW") {
697                if let Ok(secs) = s.trim().parse::<i64>() {
698                    return Ok(Value::Int(secs.saturating_mul(1_000_000_000)));
699                }
700            }
701            let now = chrono::Utc::now();
702            let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
703            return Ok(Value::Int(nanos));
704        }
705        if kind == "crypto" && op == "random" {
706            self.ensure_kind_allowed("random")?;
707            let n = expect_int(args.first())?;
708            if !(0..=1_048_576).contains(&n) {
709                return Err("crypto.random: n must be in 0..=1048576".into());
710            }
711            use rand::{rngs::SysRng, TryRng};
712            let mut buf = vec![0u8; n as usize];
713            SysRng.try_fill_bytes(&mut buf)
714                .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
715            return Ok(Value::Bytes(buf));
716        }
717        // crypto.random_str_hex(n) — N random bytes rendered as 2N
718        // lowercase hex chars (#382). The most common token-mint
719        // pattern (session ids, OAuth `state`, CSRF, request ids).
720        // Same `[random]` gate as `crypto.random`.
721        if kind == "crypto" && op == "random_str_hex" {
722            self.ensure_kind_allowed("random")?;
723            let n = expect_int(args.first())?;
724            if !(0..=1_048_576).contains(&n) {
725                return Err("crypto.random_str_hex: n must be in 0..=1048576".into());
726            }
727            use rand::{rngs::SysRng, TryRng};
728            let mut buf = vec![0u8; n as usize];
729            SysRng.try_fill_bytes(&mut buf)
730                .map_err(|e| format!("crypto.random_str_hex: OS RNG: {e}"))?;
731            return Ok(Value::Str(hex::encode(&buf)));
732        }
733        // `std.http` wire ops (send/get/post) gate on the `net`
734        // effect kind, not the module name. This matches the
735        // declared signature (`http.get :: Str -> [net] ...`) and
736        // keeps `--allow-effects net` doing the obvious thing for
737        // both `net.*` and `http.*` callers.
738        // `std.agent` (#184): the four runtime effects added for
739        // agent-style programs (`llm_local`, `llm_cloud`, `a2a`,
740        // `mcp`). The handlers are stubs — they enforce the
741        // declared-effect gate, return a sentinel `Ok` so traces
742        // record the call, and defer the real wire formats to
743        // downstream crates (`soft-agent` for `llm_*` and `a2a`)
744        // and #185 (MCP client wrapper).
745        if kind == "agent" {
746            let effect_kind = match op {
747                "local_complete" => "llm_local",
748                "cloud_complete" => "llm_cloud",
749                "cloud_stream"   => "llm_cloud",
750                "send_a2a"       => "a2a",
751                "call_mcp"       => "mcp",
752                other => return Err(format!("unsupported agent.{other}")),
753            };
754            self.ensure_kind_allowed(effect_kind)?;
755            // `call_mcp` runs through the LRU client cache
756            // (#197). `local_complete` / `cloud_complete` hit
757            // Ollama / OpenAI via env-var-driven configuration
758            // (#196); custom backends override at the
759            // EffectHandler layer rather than via a config file.
760            // `send_a2a` keeps its stub — that wire format
761            // lives in downstream `soft-a2a`.
762            return match op {
763                "call_mcp"       => Ok(self.dispatch_call_mcp(args)),
764                "local_complete" => Ok(dispatch_llm_local(args)),
765                "cloud_complete" => Ok(dispatch_llm_cloud(args)),
766                "cloud_stream"   => Ok(self.dispatch_cloud_stream(args)),
767                _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
768            };
769        }
770        if kind == "stream" {
771            // #305 slice 3: consumer-side stream operations. Each
772            // op resolves the opaque handle in the parent handler's
773            // stream registry and pulls one or all items. The
774            // `stream` effect must be granted by policy; default
775            // policies for agent runs grant it alongside the
776            // producer effect (e.g. `llm_cloud`).
777            self.ensure_kind_allowed("stream")?;
778            return match op {
779                "next"    => Ok(self.dispatch_stream_next(args)),
780                "collect" => Ok(self.dispatch_stream_collect(args)),
781                other => Err(format!("unsupported stream.{other}")),
782            };
783        }
784        if kind == "http" && matches!(op, "send" | "get" | "post") {
785            self.ensure_kind_allowed("net")?;
786            return match op {
787                "send" => {
788                    let req = expect_record(args.first())?;
789                    Ok(http_send_record(self, req))
790                }
791                "get" => {
792                    let url = expect_str(args.first())?.to_string();
793                    self.ensure_host_allowed(&url)?;
794                    Ok(http_send_simple("GET", &url, None, "", None))
795                }
796                "post" => {
797                    let url = expect_str(args.first())?.to_string();
798                    let body = expect_bytes(args.get(1))?.clone();
799                    let content_type = expect_str(args.get(2))?.to_string();
800                    self.ensure_host_allowed(&url)?;
801                    Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
802                }
803                _ => unreachable!(),
804            };
805        }
806        self.ensure_kind_allowed(kind)?;
807        match (kind, op) {
808            ("io", "print") => {
809                let line = expect_str(args.first())?;
810                self.sink.print_line(line);
811                Ok(Value::Unit)
812            }
813            ("io", "read") => {
814                let path = expect_str(args.first())?.to_string();
815                let resolved = self.resolve_read_path(&path);
816                // Honor read-allowlist if any. Symmetric with io.write.
817                // The path argument is checked as-given (resolved-against-
818                // read_root for tests); a tool granted [io] cannot escape
819                // the configured prefix even though the effect itself is
820                // permitted. This is the per-path scope the bench's case
821                // #6 ("[io] granted, body reads /etc/passwd") needed.
822                if !self.policy.allow_fs_read.is_empty()
823                    && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
824                {
825                    return Err(format!("read of `{path}` outside --allow-fs-read"));
826                }
827                match std::fs::read_to_string(&resolved) {
828                    Ok(s) => Ok(ok(Value::Str(s))),
829                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
830                }
831            }
832            ("io", "write") => {
833                let path = expect_str(args.first())?.to_string();
834                let contents = expect_str(args.get(1))?.to_string();
835                // Honor write-allowlist if any.
836                if !self.policy.allow_fs_write.is_empty() {
837                    let p = std::path::Path::new(&path);
838                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
839                        return Err(format!("write to `{path}` outside --allow-fs-write"));
840                    }
841                }
842                match std::fs::write(&path, contents) {
843                    Ok(_) => Ok(ok(Value::Unit)),
844                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
845                }
846            }
847            ("time", "now") => {
848                // LEX_TEST_NOW (Unix seconds) pins for deterministic tests.
849                if let Ok(s) = std::env::var("LEX_TEST_NOW") {
850                    if let Ok(secs) = s.trim().parse::<i64>() {
851                        return Ok(Value::Int(secs));
852                    }
853                }
854                let secs = SystemTime::now().duration_since(UNIX_EPOCH)
855                    .map_err(|e| format!("time: {e}"))?.as_secs();
856                Ok(Value::Int(secs as i64))
857            }
858            ("time", "now_ms") => {
859                // Unix epoch in milliseconds (#378). `LEX_TEST_NOW` is
860                // documented in seconds, so we lift it to ms by *1000
861                // to keep the pinning story uniform across `time.now`
862                // and `time.now_ms`.
863                if let Ok(s) = std::env::var("LEX_TEST_NOW") {
864                    if let Ok(secs) = s.trim().parse::<i64>() {
865                        return Ok(Value::Int(secs.saturating_mul(1000)));
866                    }
867                }
868                let ms = SystemTime::now().duration_since(UNIX_EPOCH)
869                    .map_err(|e| format!("time: {e}"))?.as_millis();
870                Ok(Value::Int(ms as i64))
871            }
872            ("time", "now_str") => {
873                // ISO-8601 / RFC 3339 in UTC (#378). Format mirrors
874                // `chrono::Utc::now().to_rfc3339()` already used
875                // elsewhere in the handler.
876                if let Ok(s) = std::env::var("LEX_TEST_NOW") {
877                    if let Ok(secs) = s.trim().parse::<i64>() {
878                        let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0)
879                            .unwrap_or_else(chrono::Utc::now);
880                        return Ok(Value::Str(dt.to_rfc3339()));
881                    }
882                }
883                Ok(Value::Str(chrono::Utc::now().to_rfc3339()))
884            }
885            ("time", "mono_ns") => {
886                // Monotonic clock relative to process start. Cached
887                // `Instant::now()` anchor so successive `mono_ns`
888                // calls return strictly non-decreasing values without
889                // depending on the wall clock. Not affected by
890                // `LEX_TEST_NOW` — pinning a monotonic clock would
891                // defeat its purpose; tests needing a fake monotonic
892                // clock should swap in their own `EffectHandler`.
893                static MONO_START: OnceLock<std::time::Instant> = OnceLock::new();
894                let start = MONO_START.get_or_init(std::time::Instant::now);
895                let dur = std::time::Instant::now().duration_since(*start);
896                Ok(Value::Int(dur.as_nanos() as i64))
897            }
898            ("time", "sleep_ms") => {
899                // Block the current thread for `n` ms (#226). Used
900                // by `flow.retry_with_backoff`'s exponential delay.
901                // Negative or zero is a no-op. Bounded at 60s in the
902                // runtime to avoid pathological agent-emitted loops
903                // wedging the host — anything legitimate beyond
904                // that should use process-level scheduling, not a
905                // blocking sleep.
906                let n = expect_int(args.first())?;
907                if n > 0 {
908                    let ms = (n as u64).min(60_000);
909                    std::thread::sleep(std::time::Duration::from_millis(ms));
910                }
911                Ok(Value::Unit)
912            }
913            ("rand", "int_in") => {
914                // Deterministic stub: midpoint of [lo, hi].
915                let lo = expect_int(args.first())?;
916                let hi = expect_int(args.get(1))?;
917                Ok(Value::Int((lo + hi) / 2))
918            }
919            // `env.get` returns `Option[Str]` — `None` for unset vars.
920            // Per-var scoping (`[env(NAME)]`) arrives with #207's
921            // per-capability effect parameterization; today the flat
922            // `[env]` grants access to the entire process environment.
923            ("env", "get") => {
924                let name = expect_str(args.first())?;
925                Ok(match std::env::var(name) {
926                    Ok(v) => Value::Variant {
927                        name: "Some".into(),
928                        args: vec![Value::Str(v)],
929                    },
930                    Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
931                })
932            }
933            ("budget", _) => {
934                // Budget calls are nominally tracked here; budget itself is
935                // enforced statically in `policy::check_program`.
936                Ok(Value::Unit)
937            }
938            ("net", "get") => {
939                let url = expect_str(args.first())?.to_string();
940                self.ensure_host_allowed(&url)?;
941                Ok(http_request("GET", &url, None))
942            }
943            ("net", "post") => {
944                let url = expect_str(args.first())?.to_string();
945                let body = expect_str(args.get(1))?.to_string();
946                self.ensure_host_allowed(&url)?;
947                Ok(http_request("POST", &url, Some(&body)))
948            }
949            ("net", "serve") => {
950                let port = match args.first() {
951                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
952                    _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
953                };
954                let handler_name = expect_str(args.get(1))?.to_string();
955                let program = self.program.clone()
956                    .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
957                let policy = self.policy.clone();
958                serve_http(port, handler_name, program, policy, None)
959            }
960            ("net", "serve_fn") => {
961                let port = match args.first() {
962                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
963                    _ => return Err("net.serve_fn(port, handler): port must be Int 0..=65535".into()),
964                };
965                let closure = match args.into_iter().nth(1) {
966                    Some(c @ Value::Closure { .. }) => c,
967                    _ => return Err("net.serve_fn(port, handler): handler must be a closure".into()),
968                };
969                let program = self.program.clone()
970                    .ok_or_else(|| "net.serve_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
971                let policy = self.policy.clone();
972                serve_http_fn(port, closure, program, policy)
973            }
974            ("net", "serve_tls") => {
975                let port = match args.first() {
976                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
977                    _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
978                };
979                let cert_path = expect_str(args.get(1))?.to_string();
980                let key_path = expect_str(args.get(2))?.to_string();
981                let handler_name = expect_str(args.get(3))?.to_string();
982                let program = self.program.clone()
983                    .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
984                let policy = self.policy.clone();
985                let cert = std::fs::read(&cert_path)
986                    .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
987                let key = std::fs::read(&key_path)
988                    .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
989                serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
990            }
991            ("net", "serve_ws") => {
992                let port = match args.first() {
993                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
994                    _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
995                };
996                let handler_name = expect_str(args.get(1))?.to_string();
997                let program = self.program.clone()
998                    .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
999                let policy = self.policy.clone();
1000                let registry = Arc::new(crate::ws::ChatRegistry::default());
1001                crate::ws::serve_ws(port, handler_name, program, policy, registry)
1002            }
1003            ("net", "serve_ws_fn") => {
1004                let port = match args.first() {
1005                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
1006                    _ => return Err("net.serve_ws_fn(port, subprotocol, handler): port must be Int 0..=65535".into()),
1007                };
1008                let subprotocol = expect_str(args.get(1))?.to_string();
1009                let closure = match args.into_iter().nth(2) {
1010                    Some(c @ Value::Closure { .. }) => c,
1011                    _ => return Err("net.serve_ws_fn(port, subprotocol, handler): handler must be a closure".into()),
1012                };
1013                let program = self.program.clone()
1014                    .ok_or_else(|| "net.serve_ws_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
1015                let policy = self.policy.clone();
1016                let registry = Arc::new(crate::ws::ChatRegistry::default());
1017                crate::ws::serve_ws_fn(port, subprotocol, closure, program, policy, registry)
1018            }
1019            ("chat", "broadcast") => {
1020                let registry = self.chat_registry.as_ref()
1021                    .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
1022                let room = expect_str(args.first())?;
1023                let body = expect_str(args.get(1))?;
1024                crate::ws::chat_broadcast(registry, room, body);
1025                Ok(Value::Unit)
1026            }
1027            ("chat", "send") => {
1028                let registry = self.chat_registry.as_ref()
1029                    .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
1030                let conn_id = match args.first() {
1031                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
1032                    _ => return Err("chat.send: conn_id must be non-negative Int".into()),
1033                };
1034                let body = expect_str(args.get(1))?;
1035                Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
1036            }
1037            ("kv", "open") => {
1038                let path = expect_str(args.first())?.to_string();
1039                // Honor write-allowlist: opening a Kv writes its
1040                // backing files at `path`, so the same scoping that
1041                // applies to `io.write` applies here.
1042                if !self.policy.allow_fs_write.is_empty() {
1043                    let p = std::path::Path::new(&path);
1044                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1045                        return Ok(err(Value::Str(format!(
1046                            "kv.open: `{path}` outside --allow-fs-write"))));
1047                    }
1048                }
1049                match sled::open(&path) {
1050                    Ok(db) => {
1051                        let handle = next_kv_handle();
1052                        kv_registry().lock().unwrap().insert(handle, db);
1053                        Ok(ok(Value::Int(handle as i64)))
1054                    }
1055                    Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
1056                }
1057            }
1058            ("kv", "close") => {
1059                let h = expect_kv_handle(args.first())?;
1060                kv_registry().lock().unwrap().remove(h);
1061                Ok(Value::Unit)
1062            }
1063            ("kv", "get") => {
1064                let h = expect_kv_handle(args.first())?;
1065                let key = expect_str(args.get(1))?;
1066                let mut reg = kv_registry().lock().unwrap();
1067                let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
1068                match db.get(key.as_bytes()) {
1069                    Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
1070                    Ok(None) => Ok(none()),
1071                    Err(e) => Err(format!("kv.get: {e}")),
1072                }
1073            }
1074            ("kv", "put") => {
1075                let h = expect_kv_handle(args.first())?;
1076                let key = expect_str(args.get(1))?.to_string();
1077                let val = expect_bytes(args.get(2))?.clone();
1078                let mut reg = kv_registry().lock().unwrap();
1079                let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
1080                match db.insert(key.as_bytes(), val) {
1081                    Ok(_) => Ok(ok(Value::Unit)),
1082                    Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
1083                }
1084            }
1085            ("kv", "delete") => {
1086                let h = expect_kv_handle(args.first())?;
1087                let key = expect_str(args.get(1))?;
1088                let mut reg = kv_registry().lock().unwrap();
1089                let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
1090                match db.remove(key.as_bytes()) {
1091                    Ok(_) => Ok(ok(Value::Unit)),
1092                    Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
1093                }
1094            }
1095            ("kv", "contains") => {
1096                let h = expect_kv_handle(args.first())?;
1097                let key = expect_str(args.get(1))?;
1098                let mut reg = kv_registry().lock().unwrap();
1099                let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1100                match db.contains_key(key.as_bytes()) {
1101                    Ok(present) => Ok(Value::Bool(present)),
1102                    Err(e) => Err(format!("kv.contains: {e}")),
1103                }
1104            }
1105            ("kv", "list_prefix") => {
1106                let h = expect_kv_handle(args.first())?;
1107                let prefix = expect_str(args.get(1))?;
1108                let mut reg = kv_registry().lock().unwrap();
1109                let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1110                let mut keys: Vec<Value> = Vec::new();
1111                for kv in db.scan_prefix(prefix.as_bytes()) {
1112                    let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1113                    let s = String::from_utf8_lossy(&k).to_string();
1114                    keys.push(Value::Str(s));
1115                }
1116                Ok(Value::List(keys))
1117            }
1118            ("sql", "open") => {
1119                let path = expect_str(args.first())?.to_string();
1120                if path.starts_with("postgres://") || path.starts_with("postgresql://") {
1121                    // Postgres: connect via sync driver; no fs-write policy applies.
1122                    match postgres::Client::connect(&path, postgres::NoTls) {
1123                        Ok(client) => {
1124                            let handle = next_sql_handle();
1125                            sql_registry().lock().unwrap().insert(handle, SqlConn::Postgres(client));
1126                            Ok(ok(Value::Int(handle as i64)))
1127                        }
1128                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.open"))),
1129                    }
1130                } else {
1131                    // SQLite: same shape as `kv.open`; fs-write allowlist applies
1132                    // (in-memory paths are exempt).
1133                    if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1134                        let p = std::path::Path::new(&path);
1135                        if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1136                            return Ok(err(sql_error(
1137                                format!("sql.open: `{path}` outside --allow-fs-write"),
1138                                None, None,
1139                            )));
1140                        }
1141                    }
1142                    match rusqlite::Connection::open(&path) {
1143                        Ok(conn) => {
1144                            let handle = next_sql_handle();
1145                            sql_registry().lock().unwrap().insert(handle, SqlConn::Sqlite(conn));
1146                            Ok(ok(Value::Int(handle as i64)))
1147                        }
1148                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.open"))),
1149                    }
1150                }
1151            }
1152            ("sql", "close") => {
1153                let h = expect_sql_handle(args.first())?;
1154                sql_registry().lock().unwrap().remove(h);
1155                Ok(Value::Unit)
1156            }
1157            ("sql", "exec") => {
1158                let h = expect_sql_handle(args.first())?;
1159                let stmt = expect_str(args.get(1))?.to_string();
1160                let params = expect_sql_params(args.get(2))?;
1161                let arc = sql_registry().lock().unwrap()
1162                    .touch_get(h)
1163                    .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1164                let mut conn = arc.lock().unwrap();
1165                match &mut *conn {
1166                    SqlConn::Sqlite(c) => {
1167                        let bound = sqlite_params(&params);
1168                        let bind: Vec<&dyn rusqlite::ToSql> =
1169                            bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1170                        match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1171                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1172                            Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec"))),
1173                        }
1174                    }
1175                    SqlConn::Postgres(c) => {
1176                        let pg = pg_param_refs(&params);
1177                        let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1178                            pg.iter().map(|b| b.as_ref()).collect();
1179                        match c.execute(stmt.as_str(), &refs) {
1180                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1181                            Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec"))),
1182                        }
1183                    }
1184                }
1185            }
1186            ("sql", "query") => {
1187                let h = expect_sql_handle(args.first())?;
1188                let stmt_str = expect_str(args.get(1))?.to_string();
1189                let params = expect_sql_params(args.get(2))?;
1190                let arc = sql_registry().lock().unwrap()
1191                    .touch_get(h)
1192                    .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1193                let mut conn = arc.lock().unwrap();
1194                Ok(match &mut *conn {
1195                    SqlConn::Sqlite(c)   => sql_run_query_sqlite(c, &stmt_str, &params),
1196                    SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, &params),
1197                })
1198            }
1199            // Streaming cursor (#379). Allocates an mpsc-backed cursor
1200            // handle, spawns a producer thread to ship rows one at a
1201            // time, and returns `__IterCursor(handle)` wrapped in `Ok`.
1202            // `iter.next` bytecode dispatches the variant tag and
1203            // effect-calls `sql.cursor_next` (below) to advance.
1204            ("sql", "query_iter") => {
1205                let h = expect_sql_handle(args.first())?;
1206                let stmt_str = expect_str(args.get(1))?.to_string();
1207                let params = expect_sql_params(args.get(2))?;
1208                let arc = sql_registry().lock().unwrap()
1209                    .touch_get(h)
1210                    .ok_or_else(|| "sql.query_iter: closed or unknown Db handle".to_string())?;
1211
1212                // Dispatch producer on the connection kind without
1213                // holding the SqlRegistry lock — the producer thread
1214                // owns its own clone of the connection Arc.
1215                let (sender, receiver) = std::sync::mpsc::sync_channel::<Result<Value, String>>(
1216                    CURSOR_CHANNEL_CAPACITY,
1217                );
1218                let cursor_h = next_cursor_handle();
1219                cursor_registry().lock().unwrap().insert(cursor_h, receiver);
1220
1221                let arc_for_thread = Arc::clone(&arc);
1222                // Decide which producer to spawn based on the
1223                // connection's variant. We can briefly peek at the
1224                // variant here without holding the lock for the
1225                // producer's lifetime — the producer locks again
1226                // inside its thread function.
1227                let is_sqlite = matches!(*arc.lock().unwrap(), SqlConn::Sqlite(_));
1228                std::thread::spawn(move || {
1229                    if is_sqlite {
1230                        sqlite_cursor_producer(arc_for_thread, stmt_str, params, sender);
1231                    } else {
1232                        pg_cursor_producer(arc_for_thread, stmt_str, params, sender);
1233                    }
1234                });
1235
1236                Ok(ok(Value::Variant {
1237                    name: "__IterCursor".into(),
1238                    args: vec![Value::Int(cursor_h as i64)],
1239                }))
1240            }
1241            // Pull one row from the producer; called from
1242            // `iter.next`'s `__IterCursor` dispatch branch. Returns
1243            // a Lex `Option[Row]`: `Some(row)` while the producer
1244            // has more, `None` once the channel closes (producer
1245            // done, errored, or cursor evicted from the registry).
1246            ("sql", "cursor_next") => {
1247                let h = match args.first() {
1248                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
1249                    _ => return Err("sql.cursor_next: expected cursor handle (Int)".into()),
1250                };
1251                let rx_arc = match cursor_registry().lock().unwrap().touch_get(h) {
1252                    Some(a) => a,
1253                    None => return Ok(Value::Variant { name: "None".into(), args: vec![] }),
1254                };
1255                // Lock the receiver itself (separate from the global
1256                // registry lock) and block on `recv()`. The producer
1257                // is on a different thread, so this can sleep without
1258                // contention beyond the per-cursor mutex.
1259                let recv_result = {
1260                    let rx = match rx_arc.lock() {
1261                        Ok(g) => g,
1262                        Err(p) => p.into_inner(),
1263                    };
1264                    rx.recv()
1265                };
1266                match recv_result {
1267                    Ok(Ok(row)) => Ok(Value::Variant {
1268                        name: "Some".into(),
1269                        args: vec![row],
1270                    }),
1271                    Ok(Err(_)) | Err(_) => {
1272                        // Channel closed (producer done) or row error
1273                        // — drop the registry entry and signal None
1274                        // so callers stop polling.
1275                        cursor_registry().lock().unwrap().remove(h);
1276                        Ok(Value::Variant { name: "None".into(), args: vec![] })
1277                    }
1278                }
1279            }
1280            // Transactions: begin issues BEGIN SQL on the connection;
1281            // commit/rollback issue COMMIT/ROLLBACK. SqlTx reuses the
1282            // same Int handle as Db — the type system enforces correct
1283            // usage; the runtime treats both as the same registry key.
1284            ("sql", "begin") => {
1285                let h = expect_sql_handle(args.first())?;
1286                let arc = sql_registry().lock().unwrap()
1287                    .touch_get(h)
1288                    .ok_or_else(|| "sql.begin: closed or unknown Db handle".to_string())?;
1289                let mut conn = arc.lock().unwrap();
1290                match &mut *conn {
1291                    SqlConn::Sqlite(c) => match c.execute_batch("BEGIN") {
1292                        Ok(()) => Ok(ok(Value::Int(h as i64))),
1293                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.begin"))),
1294                    },
1295                    SqlConn::Postgres(c) => match c.batch_execute("BEGIN") {
1296                        Ok(()) => Ok(ok(Value::Int(h as i64))),
1297                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.begin"))),
1298                    },
1299                }
1300            }
1301            ("sql", "commit") => {
1302                let h = expect_sql_handle(args.first())?;
1303                let arc = sql_registry().lock().unwrap()
1304                    .touch_get(h)
1305                    .ok_or_else(|| "sql.commit: closed or unknown SqlTx handle".to_string())?;
1306                let mut conn = arc.lock().unwrap();
1307                match &mut *conn {
1308                    SqlConn::Sqlite(c) => match c.execute_batch("COMMIT") {
1309                        Ok(()) => Ok(ok(Value::Unit)),
1310                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.commit"))),
1311                    },
1312                    SqlConn::Postgres(c) => match c.batch_execute("COMMIT") {
1313                        Ok(()) => Ok(ok(Value::Unit)),
1314                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.commit"))),
1315                    },
1316                }
1317            }
1318            ("sql", "rollback") => {
1319                let h = expect_sql_handle(args.first())?;
1320                let arc = sql_registry().lock().unwrap()
1321                    .touch_get(h)
1322                    .ok_or_else(|| "sql.rollback: closed or unknown SqlTx handle".to_string())?;
1323                let mut conn = arc.lock().unwrap();
1324                match &mut *conn {
1325                    SqlConn::Sqlite(c) => match c.execute_batch("ROLLBACK") {
1326                        Ok(()) => Ok(ok(Value::Unit)),
1327                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.rollback"))),
1328                    },
1329                    SqlConn::Postgres(c) => match c.batch_execute("ROLLBACK") {
1330                        Ok(()) => Ok(ok(Value::Unit)),
1331                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.rollback"))),
1332                    },
1333                }
1334            }
1335            ("sql", "exec_tx") => {
1336                let h = expect_sql_handle(args.first())?;
1337                let stmt = expect_str(args.get(1))?.to_string();
1338                let params = expect_sql_params(args.get(2))?;
1339                let arc = sql_registry().lock().unwrap()
1340                    .touch_get(h)
1341                    .ok_or_else(|| "sql.exec_tx: closed or unknown SqlTx handle".to_string())?;
1342                let mut conn = arc.lock().unwrap();
1343                match &mut *conn {
1344                    SqlConn::Sqlite(c) => {
1345                        let bound = sqlite_params(&params);
1346                        let bind: Vec<&dyn rusqlite::ToSql> =
1347                            bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1348                        match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1349                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1350                            Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec_tx"))),
1351                        }
1352                    }
1353                    SqlConn::Postgres(c) => {
1354                        let pg = pg_param_refs(&params);
1355                        let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1356                            pg.iter().map(|b| b.as_ref()).collect();
1357                        match c.execute(stmt.as_str(), &refs) {
1358                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1359                            Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec_tx"))),
1360                        }
1361                    }
1362                }
1363            }
1364            ("sql", "query_tx") => {
1365                let h = expect_sql_handle(args.first())?;
1366                let stmt_str = expect_str(args.get(1))?.to_string();
1367                let params = expect_sql_params(args.get(2))?;
1368                let arc = sql_registry().lock().unwrap()
1369                    .touch_get(h)
1370                    .ok_or_else(|| "sql.query_tx: closed or unknown SqlTx handle".to_string())?;
1371                let mut conn = arc.lock().unwrap();
1372                Ok(match &mut *conn {
1373                    SqlConn::Sqlite(c)   => sql_run_query_sqlite(c, &stmt_str, &params),
1374                    SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, &params),
1375                })
1376            }
1377            ("sql", "get_str") => Ok(sql_get_col(&args, |v| match v {
1378                Value::Str(s) => Some(Value::Str(s.clone())),
1379                Value::Int(n) => Some(Value::Str(n.to_string())),
1380                _ => None,
1381            })?),
1382            ("sql", "get_int") => Ok(sql_get_col(&args, |v| match v {
1383                Value::Int(n) => Some(Value::Int(*n)),
1384                Value::Float(f) => Some(Value::Int(*f as i64)),
1385                _ => None,
1386            })?),
1387            ("sql", "get_float") => Ok(sql_get_col(&args, |v| match v {
1388                Value::Float(f) => Some(Value::Float(*f)),
1389                Value::Int(n)   => Some(Value::Float(*n as f64)),
1390                _ => None,
1391            })?),
1392            ("sql", "get_bool") => Ok(sql_get_col(&args, |v| match v {
1393                Value::Bool(b)  => Some(Value::Bool(*b)),
1394                Value::Int(n)   => Some(Value::Bool(*n != 0)),
1395                _ => None,
1396            })?),
1397            ("proc", "spawn") => {
1398                // The escape hatch effect. Spawns a child process,
1399                // collects its stdout/stderr, returns a structured
1400                // record. Allow-list is the binary basename: anything
1401                // outside `--allow-proc` is rejected pre-spawn.
1402                //
1403                // What this does NOT validate (per SECURITY.md):
1404                // - per-arg content (a script-like CLI invoked via
1405                //   --eval=... can run anything)
1406                // - environment variables (inherited from the parent)
1407                // - working directory (the parent's)
1408                //
1409                // For untrusted input, layer with OS-level
1410                // sandboxing — gVisor / nsjail / a container.
1411                let cmd = expect_str(args.first())?.to_string();
1412                let raw_args = match args.get(1) {
1413                    Some(Value::List(items)) => items,
1414                    Some(other) => return Err(format!(
1415                        "proc.spawn: args must be List[Str], got {other:?}")),
1416                    None => return Err("proc.spawn: missing args list".into()),
1417                };
1418                let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1419                    Value::Str(s) => Ok(s.clone()),
1420                    other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1421                }).collect::<Result<Vec<_>, _>>()?;
1422
1423                // Allow-list check: empty list = any binary (escape
1424                // hatch); non-empty = basename of cmd must match an
1425                // entry exactly.
1426                if !self.policy.allow_proc.is_empty() {
1427                    let basename = std::path::Path::new(&cmd)
1428                        .file_name()
1429                        .and_then(|s| s.to_str())
1430                        .unwrap_or(&cmd);
1431                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
1432                        return Ok(err(Value::Str(format!(
1433                            "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1434                            self.policy.allow_proc
1435                        ))));
1436                    }
1437                }
1438
1439                // Hard caps: the spec doesn't pin numbers, but
1440                // unbounded argv is a DoS vector.
1441                if str_args.len() > 1024 {
1442                    return Ok(err(Value::Str(
1443                        "proc.spawn: arg-count exceeds 1024".into())));
1444                }
1445                if str_args.iter().any(|a| a.len() > 65_536) {
1446                    return Ok(err(Value::Str(
1447                        "proc.spawn: per-arg length exceeds 64 KiB".into())));
1448                }
1449
1450                let output = std::process::Command::new(&cmd)
1451                    .args(&str_args)
1452                    .output();
1453                match output {
1454                    Ok(o) => {
1455                        let mut rec = indexmap::IndexMap::new();
1456                        rec.insert("stdout".into(), Value::Str(
1457                            String::from_utf8_lossy(&o.stdout).to_string()));
1458                        rec.insert("stderr".into(), Value::Str(
1459                            String::from_utf8_lossy(&o.stderr).to_string()));
1460                        rec.insert("exit_code".into(), Value::Int(
1461                            o.status.code().unwrap_or(-1) as i64));
1462                        Ok(ok(Value::Record(rec)))
1463                    }
1464                    Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1465                }
1466            }
1467            other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1468        }
1469    }
1470
1471    /// `list.par_map` worker-handler factory (#305 slice 2).
1472    ///
1473    /// Builds a fresh `DefaultHandler` per worker that shares the
1474    /// budget pool with the parent (`Arc<AtomicU64>`) so a parallel
1475    /// batch can't escape the run-wide budget ceiling. Other state
1476    /// is intentionally split per-worker:
1477    ///
1478    /// - `sink`: a `StdoutSink` per worker. Tests that capture
1479    ///   output via a `SharedSink` wrapped in `Arc<Mutex<…>>` see
1480    ///   each worker as a fresh handler. Print interleaving on
1481    ///   stdout is acceptable; tests that need ordered capture run
1482    ///   workloads serially anyway.
1483    /// - `mcp_clients`: a fresh per-worker LRU cache. The parent's
1484    ///   subprocess handles can't be shared across threads without
1485    ///   mutex-serialising every MCP call, which would defeat the
1486    ///   parallelism. Cache hit rate is sub-optimal across the
1487    ///   first call per worker; warmed caches still amortise within
1488    ///   a worker.
1489    /// - `chat_registry`: cloned `Arc<ChatRegistry>` so all workers
1490    ///   route into the same chat dispatch layer.
1491    /// - `program`: cloned `Arc<Program>` so `net.serve` (if a
1492    ///   worker invokes it) sees the same compiled program.
1493    fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1494        let mut fresh = DefaultHandler::new(self.policy.clone());
1495        // Share the budget pool atomically — slice 2's correctness
1496        // contract: parallel work counts against the same ceiling.
1497        fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1498        fresh.budget_ceiling = self.budget_ceiling;
1499        fresh.read_root = self.read_root.clone();
1500        fresh.program = self.program.clone();
1501        fresh.chat_registry = self.chat_registry.clone();
1502        // #305 slice 3: share the stream registry across workers so
1503        // a stream produced on one thread (or the parent) is
1504        // consumable on any other. The registry is already
1505        // `Arc<Mutex<…>>` so concurrent access is safe.
1506        fresh.streams = std::sync::Arc::clone(&self.streams);
1507        fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1508        Some(Box::new(fresh))
1509    }
1510}
1511
1512/// Blocks the calling thread, accepts incoming HTTP requests on
1513/// `127.0.0.1:port`, and dispatches each through the named Lex
1514/// stage. Each request gets a fresh `Vm`; the program and policy
1515/// are shared.
1516///
1517/// Handler signature in Lex (by convention):
1518///   fn <name>(req :: Record { method :: Str, path :: Str, body :: Str })
1519///        -> Record { status :: Int, body :: Str }
1520/// PEM-encoded certificate + private key, both as raw bytes.
1521pub struct TlsConfig {
1522    pub cert: Vec<u8>,
1523    pub key: Vec<u8>,
1524}
1525
1526fn serve_http(
1527    port: u16,
1528    handler_name: String,
1529    program: Arc<Program>,
1530    policy: Policy,
1531    tls: Option<TlsConfig>,
1532) -> Result<Value, String> {
1533    let (server, scheme) = match tls {
1534        None => (
1535            tiny_http::Server::http(("127.0.0.1", port))
1536                .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1537            "http",
1538        ),
1539        Some(cfg) => {
1540            let ssl = tiny_http::SslConfig {
1541                certificate: cfg.cert,
1542                private_key: cfg.key,
1543            };
1544            (
1545                tiny_http::Server::https(("127.0.0.1", port), ssl)
1546                    .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1547                "https",
1548            )
1549        }
1550    };
1551    eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1552    // Thread-per-request: the main loop accepts; each request runs on
1553    // its own worker thread with its own fresh Vm. The Program is
1554    // shared via Arc; Policy and handler_name are cloned per request.
1555    // Lex's immutability means there's no shared mutable state at the
1556    // language level — workers don't race.
1557    for req in server.incoming_requests() {
1558        let program = Arc::clone(&program);
1559        let policy = policy.clone();
1560        let handler_name = handler_name.clone();
1561        std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1562    }
1563    Ok(Value::Unit)
1564}
1565
1566fn handle_request(
1567    mut req: tiny_http::Request,
1568    program: Arc<Program>,
1569    policy: Policy,
1570    handler_name: String,
1571) {
1572    let lex_req = build_request_value(&mut req);
1573    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1574    let mut vm = Vm::with_handler(&program, Box::new(handler));
1575    match vm.call(&handler_name, vec![lex_req]) {
1576        Ok(resp) => {
1577            let (status, body, headers) = unpack_response(&resp);
1578            respond_with_body(req, status, body, headers);
1579        }
1580        Err(e) => {
1581            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1582                .with_status_code(500);
1583            let _ = req.respond(response);
1584        }
1585    }
1586}
1587
1588fn serve_http_fn(
1589    port: u16,
1590    closure: Value,
1591    program: Arc<Program>,
1592    policy: Policy,
1593) -> Result<Value, String> {
1594    let server = tiny_http::Server::http(("127.0.0.1", port))
1595        .map_err(|e| format!("net.serve_fn bind {port}: {e}"))?;
1596    eprintln!("net.serve_fn: listening on http://127.0.0.1:{port}");
1597    for req in server.incoming_requests() {
1598        let program = Arc::clone(&program);
1599        let policy = policy.clone();
1600        let closure = closure.clone();
1601        std::thread::spawn(move || handle_request_fn(req, program, policy, closure));
1602    }
1603    Ok(Value::Unit)
1604}
1605
1606fn handle_request_fn(
1607    mut req: tiny_http::Request,
1608    program: Arc<Program>,
1609    policy: Policy,
1610    closure: Value,
1611) {
1612    let lex_req = build_request_value(&mut req);
1613    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1614    let mut vm = Vm::with_handler(&program, Box::new(handler));
1615    match vm.invoke_closure_value(closure, vec![lex_req]) {
1616        Ok(resp) => {
1617            let (status, body, headers) = unpack_response(&resp);
1618            respond_with_body(req, status, body, headers);
1619        }
1620        Err(e) => {
1621            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1622                .with_status_code(500);
1623            let _ = req.respond(response);
1624        }
1625    }
1626}
1627
1628fn build_request_value(req: &mut tiny_http::Request) -> Value {
1629    let method = format!("{:?}", req.method()).to_uppercase();
1630    let url = req.url().to_string();
1631    let (path, query) = match url.split_once('?') {
1632        Some((p, q)) => (p.to_string(), q.to_string()),
1633        None => (url, String::new()),
1634    };
1635    let mut headers_map = std::collections::BTreeMap::new();
1636    for h in req.headers() {
1637        headers_map.insert(
1638            lex_bytecode::MapKey::Str(h.field.as_str().as_str().to_ascii_lowercase()),
1639            Value::Str(h.value.as_str().to_string()),
1640        );
1641    }
1642    let mut body = String::new();
1643    let _ = req.as_reader().read_to_string(&mut body);
1644    let mut rec = indexmap::IndexMap::new();
1645    rec.insert("method".into(), Value::Str(method));
1646    rec.insert("path".into(), Value::Str(path));
1647    rec.insert("query".into(), Value::Str(query));
1648    rec.insert("body".into(), Value::Str(body));
1649    rec.insert("headers".into(), Value::Map(headers_map));
1650    Value::Record(rec)
1651}
1652
1653fn unpack_response(v: &Value) -> (u16, ResponseBodyOut, Vec<tiny_http::Header>) {
1654    if let Value::Record(rec) = v {
1655        let status = rec.get("status").and_then(|s| match s {
1656            Value::Int(n) => Some(*n as u16),
1657            _ => None,
1658        }).unwrap_or(200);
1659        let body = match rec.get("body") {
1660            // Tagged ResponseBody (#375): BodyStr | BodyStream | BodyBytes.
1661            Some(Value::Variant { name, args }) => match (name.as_str(), args.as_slice()) {
1662                ("BodyStr",    [Value::Str(s)])             => ResponseBodyOut::Str(s.clone()),
1663                ("BodyStream", [iter_v])                    => ResponseBodyOut::TextChunks(drain_iter_str(iter_v)),
1664                ("BodyBytes",  [iter_v])                    => ResponseBodyOut::BytesChunks(drain_iter_bytes(iter_v)),
1665                _ => ResponseBodyOut::Str(String::new()),
1666            },
1667            // Escape hatch for handlers that don't use the nominal
1668            // `Response` alias and just return a structural record with
1669            // `body :: Str` (the pre-#375 contract). Lets internal
1670            // test handlers and one-liners keep working without
1671            // wrapping in `BodyStr(...)`.
1672            Some(Value::Str(s)) => ResponseBodyOut::Str(s.clone()),
1673            _ => ResponseBodyOut::Str(String::new()),
1674        };
1675        let headers = if let Some(Value::Map(hmap)) = rec.get("headers") {
1676            hmap.iter().filter_map(|(k, v)| {
1677                if let (lex_bytecode::MapKey::Str(name), Value::Str(val)) = (k, v) {
1678                    format!("{name}: {val}").parse::<tiny_http::Header>().ok()
1679                } else {
1680                    None
1681                }
1682            }).collect()
1683        } else {
1684            vec![]
1685        };
1686        return (status, body, headers);
1687    }
1688    (
1689        500,
1690        ResponseBodyOut::Str(format!("handler returned non-record: {v:?}")),
1691        vec![],
1692    )
1693}
1694
1695/// Send `body` back on `req` using the right `tiny_http` path for the
1696/// variant. The `Str` arm uses `Response::from_string` (content-length
1697/// set, no chunked encoding). The streaming arms use `Response::new`
1698/// with `content_length = None`, which makes `tiny_http` switch to
1699/// `Transfer-Encoding: chunked` on the wire.
1700///
1701/// v1 caveat: the chunked-transfer encoder buffers across `read()` calls,
1702/// so per-iter-item boundaries are not preserved on the wire — the body
1703/// is correctly chunk-encoded but may land as one large HTTP chunk. The
1704/// lazy-iter follow-up will expose Lex iter items as distinct HTTP chunks
1705/// because each `read()` will block on `iter.next` and produce one item.
1706fn respond_with_body(
1707    req: tiny_http::Request,
1708    status: u16,
1709    body: ResponseBodyOut,
1710    headers: Vec<tiny_http::Header>,
1711) {
1712    match body {
1713        ResponseBodyOut::Str(s) => {
1714            let mut response = tiny_http::Response::from_string(s).with_status_code(status);
1715            for h in headers {
1716                response.add_header(h);
1717            }
1718            let _ = req.respond(response);
1719        }
1720        ResponseBodyOut::TextChunks(chunks) | ResponseBodyOut::BytesChunks(chunks) => {
1721            let reader = ChunkReader::new(chunks);
1722            let response = tiny_http::Response::new(
1723                tiny_http::StatusCode(status),
1724                headers,
1725                reader,
1726                None, // content_length: None → chunked transfer-encoding
1727                None,
1728            );
1729            let _ = req.respond(response);
1730        }
1731    }
1732}
1733
1734/// Decoded `Response.body` (#375). The runtime emits each variant via a
1735/// different `tiny_http` path: a single `Response::from_string` for
1736/// `Str`, and a chunked-encoding `Response::new` with a `Read`-backed
1737/// chunk list for the streaming variants.
1738enum ResponseBodyOut {
1739    Str(String),
1740    /// Pre-drained text chunks. v1 ships eager-iter only; lazy producers
1741    /// (#376 follow-up) will replace this with a Read adapter that pulls
1742    /// chunks on demand from the VM.
1743    TextChunks(Vec<Vec<u8>>),
1744    /// Pre-drained binary chunks. Each inner `Vec<u8>` is one Lex
1745    /// `List[Int]` collapsed down to a byte vector.
1746    BytesChunks(Vec<Vec<u8>>),
1747}
1748
1749/// Walk a Lex `Iter[Str]` (eager (List, Int) representation) and produce
1750/// a chunk list. The chunks are byte vectors so the chunked-Read adapter
1751/// is uniform across text and binary streams.
1752///
1753/// Iter[T] representation shifted in #376: from `Tuple([list, idx])` to
1754/// `Variant("__IterEager", [list, idx])` for the eager form. Lazy iters
1755/// produced by `iter.unfold` (`Variant("__IterLazy", [seed, step])`) and
1756/// cursor-backed iters (`Variant("__IterCursor", [handle])` from #379)
1757/// are not drained eagerly here — the v1 streaming path covers only the
1758/// eager form. Lazy/cursor producers will be wired through the
1759/// `ChunkReader` in a follow-up so each `read()` calls `iter.next` via
1760/// the VM, preserving wall-clock chunk boundaries on the wire.
1761fn drain_iter_str(v: &Value) -> Vec<Vec<u8>> {
1762    match v {
1763        Value::Variant { name, args }
1764            if name == "__IterEager" && args.len() == 2 =>
1765        {
1766            if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
1767                items.iter().skip(*idx as usize).filter_map(|item| {
1768                    if let Value::Str(s) = item { Some(s.as_bytes().to_vec()) } else { None }
1769                }).collect()
1770            } else {
1771                Vec::new()
1772            }
1773        }
1774        _ => Vec::new(),
1775    }
1776}
1777
1778/// Walk a Lex `Iter[List[Int]]` and produce a chunk list. Each `List[Int]`
1779/// element is collapsed by truncating each Int to u8 (0..=255). See
1780/// `drain_iter_str` for the lazy/cursor-iter limitation.
1781fn drain_iter_bytes(v: &Value) -> Vec<Vec<u8>> {
1782    match v {
1783        Value::Variant { name, args }
1784            if name == "__IterEager" && args.len() == 2 =>
1785        {
1786            if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
1787                items.iter().skip(*idx as usize).filter_map(|item| {
1788                    if let Value::List(ints) = item {
1789                        Some(ints.iter().filter_map(|i| match i {
1790                            Value::Int(n) => Some((*n & 0xff) as u8),
1791                            _ => None,
1792                        }).collect::<Vec<u8>>())
1793                    } else {
1794                        None
1795                    }
1796                }).collect()
1797            } else {
1798                Vec::new()
1799            }
1800        }
1801        _ => Vec::new(),
1802    }
1803}
1804
1805/// `Read` adapter that returns one Lex chunk per `read()` call so
1806/// `tiny_http`'s chunked transfer-encoding emits each Lex chunk as a
1807/// distinct HTTP chunk on the wire. When the requested buffer is smaller
1808/// than the current chunk we serve a slice and keep the remainder for
1809/// the next call.
1810struct ChunkReader {
1811    chunks: std::collections::VecDeque<Vec<u8>>,
1812    cursor: usize,
1813}
1814
1815impl ChunkReader {
1816    fn new(chunks: Vec<Vec<u8>>) -> Self {
1817        Self {
1818            chunks: chunks.into_iter().filter(|c| !c.is_empty()).collect(),
1819            cursor: 0,
1820        }
1821    }
1822}
1823
1824impl std::io::Read for ChunkReader {
1825    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1826        loop {
1827            let Some(front) = self.chunks.front() else {
1828                return Ok(0);
1829            };
1830            let remaining = &front[self.cursor..];
1831            if remaining.is_empty() {
1832                self.chunks.pop_front();
1833                self.cursor = 0;
1834                continue;
1835            }
1836            let n = remaining.len().min(buf.len());
1837            buf[..n].copy_from_slice(&remaining[..n]);
1838            self.cursor += n;
1839            if self.cursor >= front.len() {
1840                self.chunks.pop_front();
1841                self.cursor = 0;
1842            }
1843            return Ok(n);
1844        }
1845    }
1846}
1847
1848/// HTTP/1.1 client backed by `ureq` + `rustls`. Accepts both
1849/// `http://` and `https://` URLs. Returns `Result[Str, Str]` as a
1850/// Lex `Value::Variant`. The earlier hand-rolled HTTP/1.0 client
1851/// was plain-TCP only — most public APIs are HTTPS, so the demo
1852/// could fetch `example.com` but not `wttr.in` or `api.github.com`.
1853fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1854    use std::time::Duration;
1855    // ureq 3 puts 4xx/5xx behind `Error::StatusCode(code)` and consumes
1856    // the response, so the body would be lost. Disabling
1857    // `http_status_as_error` lets us check the status manually and
1858    // surface `Err("status 404: <body>")` like the old code did.
1859    let agent: ureq::Agent = ureq::Agent::config_builder()
1860        .timeout_connect(Some(Duration::from_secs(10)))
1861        .timeout_recv_body(Some(Duration::from_secs(30)))
1862        .timeout_send_body(Some(Duration::from_secs(10)))
1863        .http_status_as_error(false)
1864        .build()
1865        .into();
1866    let resp = match (method, body) {
1867        ("GET", _) => agent.get(url).call(),
1868        ("POST", Some(b)) => agent.post(url).send(b),
1869        ("POST", None) => agent.post(url).send(""),
1870        (m, _) => return err_value(format!("unsupported method: {m}")),
1871    };
1872    match resp {
1873        Ok(mut r) => {
1874            let status = r.status().as_u16();
1875            let body = r.body_mut().read_to_string().unwrap_or_default();
1876            if (200..300).contains(&status) {
1877                Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1878            } else {
1879                err_value(format!("status {status}: {body}"))
1880            }
1881        }
1882        Err(e) => err_value(format!("transport: {e}")),
1883    }
1884}
1885
1886/// Build a ureq agent for `std.http.{send,get,post}` with the given
1887/// timeout (None → use the same defaults as the legacy `net.{get,post}`
1888/// path). Separate from `http_request` so the rich `http.send` flow
1889/// can supply per-request overrides.
1890fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1891    use std::time::Duration;
1892    let mut b = ureq::Agent::config_builder()
1893        .timeout_connect(Some(Duration::from_secs(10)))
1894        .timeout_recv_body(Some(Duration::from_secs(30)))
1895        .timeout_send_body(Some(Duration::from_secs(10)))
1896        .http_status_as_error(false);
1897    if let Some(ms) = timeout_ms {
1898        let d = Duration::from_millis(ms);
1899        b = b.timeout_global(Some(d));
1900    }
1901    b.build().into()
1902}
1903
1904/// Map ureq's transport error to the structured `HttpError` variant
1905/// std.http exposes to user code. Anything not specifically a
1906/// timeout / TLS error funnels into `NetworkError`.
1907fn http_error_value(e: ureq::Error) -> Value {
1908    let (ctor, payload): (&str, Option<String>) = match &e {
1909        ureq::Error::Timeout(_) => ("TimeoutError", None),
1910        ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1911        ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1912        ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1913        _ => ("NetworkError", Some(format!("{e}"))),
1914    };
1915    let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1916    let inner = Value::Variant { name: ctor.into(), args };
1917    Value::Variant { name: "Err".into(), args: vec![inner] }
1918}
1919
1920fn http_decode_err(msg: String) -> Value {
1921    let inner = Value::Variant {
1922        name: "DecodeError".into(),
1923        args: vec![Value::Str(msg)],
1924    };
1925    Value::Variant { name: "Err".into(), args: vec![inner] }
1926}
1927
1928/// Run a request and pack the ureq response into the
1929/// `{ status, headers, body }` Lex record (or the structured
1930/// `HttpError` on failure). `headers_extra` pairs are appended to the
1931/// outgoing request after `content_type` is applied.
1932fn http_send_simple(
1933    method: &str,
1934    url: &str,
1935    body: Option<Vec<u8>>,
1936    content_type: &str,
1937    timeout_ms: Option<u64>,
1938) -> Value {
1939    http_send_full(method, url, body, content_type, &[], timeout_ms)
1940}
1941
1942fn http_send_full(
1943    method: &str,
1944    url: &str,
1945    body: Option<Vec<u8>>,
1946    content_type: &str,
1947    headers: &[(String, String)],
1948    timeout_ms: Option<u64>,
1949) -> Value {
1950    let agent = http_agent(timeout_ms);
1951    let resp = match method {
1952        "GET" => {
1953            let mut req = agent.get(url);
1954            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1955            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1956            req.call()
1957        }
1958        "POST" => {
1959            let body = body.unwrap_or_default();
1960            let mut req = agent.post(url);
1961            if !content_type.is_empty() { req = req.header("content-type", content_type); }
1962            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1963            req.send(&body[..])
1964        }
1965        m => {
1966            // Other methods (PUT, DELETE, PATCH, ...) fall through
1967            // here in v1.5; for now surface a structured DecodeError
1968            // so the caller can match it.
1969            return http_decode_err(format!("unsupported method: {m}"));
1970        }
1971    };
1972    match resp {
1973        Ok(mut r) => {
1974            let status = r.status().as_u16() as i64;
1975            let headers_map = collect_response_headers(r.headers());
1976            let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1977                Ok(b) => b,
1978                Err(e) => return http_decode_err(format!("body read: {e}")),
1979            };
1980            let mut rec = indexmap::IndexMap::new();
1981            rec.insert("status".into(), Value::Int(status));
1982            rec.insert("headers".into(), Value::Map(headers_map));
1983            rec.insert("body".into(), Value::Bytes(body_bytes));
1984            Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1985        }
1986        Err(e) => http_error_value(e),
1987    }
1988}
1989
1990fn collect_response_headers(
1991    headers: &ureq::http::HeaderMap,
1992) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1993    let mut out = std::collections::BTreeMap::new();
1994    for (name, value) in headers.iter() {
1995        let v = value.to_str().unwrap_or("").to_string();
1996        out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1997    }
1998    out
1999}
2000
2001/// Pull the standard `HttpRequest` shape out of a `Value::Record`
2002/// and dispatch through `http_send_full`. The handler verifies
2003/// `--allow-net-host` for the URL before sending.
2004fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
2005    let method = match req.get("method") {
2006        Some(Value::Str(s)) => s.clone(),
2007        _ => return http_decode_err("HttpRequest.method must be Str".into()),
2008    };
2009    let url = match req.get("url") {
2010        Some(Value::Str(s)) => s.clone(),
2011        _ => return http_decode_err("HttpRequest.url must be Str".into()),
2012    };
2013    if let Err(e) = handler.ensure_host_allowed(&url) {
2014        return http_decode_err(e);
2015    }
2016    let body = match req.get("body") {
2017        Some(Value::Variant { name, args }) if name == "None" => None,
2018        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2019            [Value::Bytes(b)] => Some(b.clone()),
2020            _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
2021        },
2022        _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
2023    };
2024    let timeout_ms = match req.get("timeout_ms") {
2025        Some(Value::Variant { name, .. }) if name == "None" => None,
2026        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2027            [Value::Int(n)] if *n >= 0 => Some(*n as u64),
2028            _ => return http_decode_err(
2029                "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
2030        },
2031        _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
2032    };
2033    let headers: Vec<(String, String)> = match req.get("headers") {
2034        Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
2035            let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
2036            let vv = match v { Value::Str(s) => s.clone(), _ => return None };
2037            Some((kk, vv))
2038        }).collect(),
2039        _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
2040    };
2041    http_send_full(&method, &url, body, "", &headers, timeout_ms)
2042}
2043
2044fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
2045    match v {
2046        Some(Value::Record(r)) => Ok(r),
2047        Some(other) => Err(format!("expected Record, got {other:?}")),
2048        None => Err("missing Record argument".into()),
2049    }
2050}
2051
2052fn err_value(msg: String) -> Value {
2053    Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
2054}
2055
2056fn expect_str(v: Option<&Value>) -> Result<&str, String> {
2057    match v {
2058        Some(Value::Str(s)) => Ok(s),
2059        Some(other) => Err(format!("expected Str arg, got {other:?}")),
2060        None => Err("missing argument".into()),
2061    }
2062}
2063
2064fn expect_int(v: Option<&Value>) -> Result<i64, String> {
2065    match v {
2066        Some(Value::Int(n)) => Ok(*n),
2067        Some(other) => Err(format!("expected Int arg, got {other:?}")),
2068        None => Err("missing argument".into()),
2069    }
2070}
2071
2072fn ok(v: Value) -> Value {
2073    Value::Variant { name: "Ok".into(), args: vec![v] }
2074}
2075fn err(v: Value) -> Value {
2076    Value::Variant { name: "Err".into(), args: vec![v] }
2077}
2078
2079/// Build a `SqlError = { message, code, detail }` Lex record (#380).
2080/// `code` and `detail` are `None` by default; the driver-specific
2081/// converters below populate them with real values.
2082fn sql_error(message: impl Into<String>, code: Option<String>, detail: Option<String>) -> Value {
2083    let some = |s: String| Value::Variant { name: "Some".into(), args: vec![Value::Str(s)] };
2084    let none = || Value::Variant { name: "None".into(), args: vec![] };
2085    let mut rec = indexmap::IndexMap::new();
2086    rec.insert("message".into(), Value::Str(message.into()));
2087    rec.insert("code".into(), match code {
2088        Some(c) => some(c),
2089        None => none(),
2090    });
2091    rec.insert("detail".into(), match detail {
2092        Some(d) => some(d),
2093        None => none(),
2094    });
2095    Value::Record(rec)
2096}
2097
2098/// Convert a rusqlite error into a `SqlError`. The `code` is the
2099/// symbolic extended-result-code name (`SQLITE_BUSY`,
2100/// `SQLITE_CONSTRAINT_UNIQUE`, …) when present — this is what
2101/// callers want for dialect-aware retry / conflict handling.
2102///
2103/// rusqlite has two main error shapes that carry a numeric code:
2104/// `SqliteFailure` (driver-side runtime errors — constraints, busy,
2105/// IO) and `SqlInputError` (statement-preparation failures —
2106/// syntax, unknown table). Both are unpacked the same way.
2107fn sqlite_err_to_sql_error(e: rusqlite::Error, op: &str) -> Value {
2108    let message = format!("{op}: {e}");
2109    match &e {
2110        rusqlite::Error::SqliteFailure(ffi, detail_opt) => {
2111            sql_error(
2112                message,
2113                Some(sqlite_extended_code_name(ffi.extended_code)),
2114                detail_opt.clone(),
2115            )
2116        }
2117        rusqlite::Error::SqlInputError { error, msg, .. } => {
2118            sql_error(
2119                message,
2120                Some(sqlite_extended_code_name(error.extended_code)),
2121                Some(msg.clone()),
2122            )
2123        }
2124        _ => sql_error(message, None, None),
2125    }
2126}
2127
2128/// Map a SQLite extended result code (numeric) to its symbolic name.
2129/// We only cover the codes a Lex caller is likely to dispatch on
2130/// (constraint kinds, busy/locked, read-only, IO); anything else
2131/// falls back to a generic `SQLITE_ERROR_<n>` stringification so the
2132/// numeric code is still recoverable.
2133fn sqlite_extended_code_name(code: i32) -> String {
2134    use rusqlite::ffi::*;
2135    let s = match code {
2136        SQLITE_BUSY => "SQLITE_BUSY",
2137        SQLITE_LOCKED => "SQLITE_LOCKED",
2138        SQLITE_READONLY => "SQLITE_READONLY",
2139        SQLITE_IOERR => "SQLITE_IOERR",
2140        SQLITE_CORRUPT => "SQLITE_CORRUPT",
2141        SQLITE_NOTFOUND => "SQLITE_NOTFOUND",
2142        SQLITE_FULL => "SQLITE_FULL",
2143        SQLITE_CANTOPEN => "SQLITE_CANTOPEN",
2144        SQLITE_PROTOCOL => "SQLITE_PROTOCOL",
2145        SQLITE_SCHEMA => "SQLITE_SCHEMA",
2146        SQLITE_TOOBIG => "SQLITE_TOOBIG",
2147        SQLITE_CONSTRAINT => "SQLITE_CONSTRAINT",
2148        SQLITE_CONSTRAINT_CHECK => "SQLITE_CONSTRAINT_CHECK",
2149        SQLITE_CONSTRAINT_FOREIGNKEY => "SQLITE_CONSTRAINT_FOREIGNKEY",
2150        SQLITE_CONSTRAINT_NOTNULL => "SQLITE_CONSTRAINT_NOTNULL",
2151        SQLITE_CONSTRAINT_PRIMARYKEY => "SQLITE_CONSTRAINT_PRIMARYKEY",
2152        SQLITE_CONSTRAINT_TRIGGER => "SQLITE_CONSTRAINT_TRIGGER",
2153        SQLITE_CONSTRAINT_UNIQUE => "SQLITE_CONSTRAINT_UNIQUE",
2154        SQLITE_CONSTRAINT_VTAB => "SQLITE_CONSTRAINT_VTAB",
2155        SQLITE_CONSTRAINT_ROWID => "SQLITE_CONSTRAINT_ROWID",
2156        SQLITE_MISMATCH => "SQLITE_MISMATCH",
2157        SQLITE_RANGE => "SQLITE_RANGE",
2158        SQLITE_NOTADB => "SQLITE_NOTADB",
2159        SQLITE_AUTH => "SQLITE_AUTH",
2160        _ => return format!("SQLITE_ERROR_{code}"),
2161    };
2162    s.to_string()
2163}
2164
2165/// Convert a postgres error into a `SqlError`. The `code` is the
2166/// 5-character SQLSTATE (`23505`, `40P01`, …); `detail` is the
2167/// driver's optional detail message when present.
2168fn pg_err_to_sql_error(e: postgres::Error, op: &str) -> Value {
2169    let message = format!("{op}: {e}");
2170    let code = e.as_db_error().map(|db| db.code().code().to_string());
2171    let detail = e.as_db_error().and_then(|db| db.detail().map(|s| s.to_string()));
2172    sql_error(message, code, detail)
2173}
2174
2175impl DefaultHandler {
2176    /// Implementation of `agent.call_mcp(server, tool, args_json)`.
2177    /// Goes through the LRU client cache (#197): the named server
2178    /// is spawned on first use and reused on subsequent calls.
2179    /// On failure the offending client is dropped so the next
2180    /// call respawns rather than silently failing forever.
2181    fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
2182        let server = match args.first() {
2183            Some(Value::Str(s)) => s.clone(),
2184            _ => return err(Value::Str(
2185                "agent.call_mcp(server, tool, args_json): server must be Str".into())),
2186        };
2187        let tool = match args.get(1) {
2188            Some(Value::Str(s)) => s.clone(),
2189            _ => return err(Value::Str(
2190                "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
2191        };
2192        let args_json = match args.get(2) {
2193            Some(Value::Str(s)) => s.clone(),
2194            _ => return err(Value::Str(
2195                "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
2196        };
2197        let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
2198            Ok(v) => v,
2199            Err(e) => return err(Value::Str(format!(
2200                "agent.call_mcp: args_json is not valid JSON: {e}"))),
2201        };
2202        match self.mcp_clients.call(&server, &tool, parsed) {
2203            Ok(result) => ok(Value::Str(
2204                serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
2205            Err(e) => err(Value::Str(e)),
2206        }
2207    }
2208
2209    /// Implementation of `agent.cloud_stream(prompt) -> Result[Stream[Str], Str]`
2210    /// (#305 slice 3). The fixture path (`LEX_LLM_STREAM_FIXTURE`)
2211    /// splits the env-var value on `|` and yields each segment as
2212    /// one chunk; it's the load-bearing test hook. Live HTTP
2213    /// chunked-response support is deferred to a follow-up slice.
2214    fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
2215        let _prompt = match args.first() {
2216            Some(Value::Str(s)) => s.clone(),
2217            _ => return err(Value::Str(
2218                "agent.cloud_stream(prompt): prompt must be Str".into())),
2219        };
2220        let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
2221            Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
2222            Err(_) => return err(Value::Str(
2223                "agent.cloud_stream: live streaming not yet implemented; \
2224                 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
2225        };
2226        let handle = self.register_stream(chunks.into_iter());
2227        ok(stream_handle_value(handle))
2228    }
2229
2230    /// Implementation of `stream.next(s) -> Option[T]` (#305 slice 3).
2231    /// Returns `Some(chunk)` for each producer yield and `None` once
2232    /// the producer is exhausted. Unknown handle ids return `None`
2233    /// rather than erroring so streams can be safely consumed past
2234    /// the end (matches the semantics of `Iterator::next`).
2235    fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
2236        let handle = match args.first().and_then(stream_handle_id) {
2237            Some(h) => h,
2238            None => return Value::Variant { name: "None".into(), args: vec![] },
2239        };
2240        let mut streams = match self.streams.lock() {
2241            Ok(g) => g,
2242            Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
2243        };
2244        match streams.get_mut(&handle).and_then(|it| it.next()) {
2245            Some(chunk) => some(Value::Str(chunk)),
2246            None => {
2247                streams.remove(&handle);
2248                Value::Variant { name: "None".into(), args: vec![] }
2249            }
2250        }
2251    }
2252
2253    /// Implementation of `stream.collect(s) -> List[T]` (#305 slice 3).
2254    /// Drains the producer eagerly. Unknown handles drain to an
2255    /// empty list so the contract is `collect ∘ collect = []`
2256    /// (idempotent on a closed stream).
2257    fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
2258        let handle = match args.first().and_then(stream_handle_id) {
2259            Some(h) => h,
2260            None => return Value::List(Vec::new()),
2261        };
2262        let mut iter = {
2263            let mut streams = match self.streams.lock() {
2264                Ok(g) => g,
2265                Err(_) => return Value::List(Vec::new()),
2266            };
2267            match streams.remove(&handle) {
2268                Some(it) => it,
2269                None => return Value::List(Vec::new()),
2270            }
2271        };
2272        let mut out: Vec<Value> = Vec::new();
2273        for chunk in iter.by_ref() {
2274            out.push(Value::Str(chunk));
2275        }
2276        Value::List(out)
2277    }
2278
2279    /// Register a producer iterator and return its handle id. The
2280    /// handle is monotonic-counter-based so two streams created in
2281    /// quick succession get distinct ids.
2282    fn register_stream<I>(&self, iter: I) -> String
2283    where
2284        I: Iterator<Item = String> + Send + 'static,
2285    {
2286        let id = self
2287            .next_stream_id
2288            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2289        let handle = format!("stream_{id}");
2290        if let Ok(mut streams) = self.streams.lock() {
2291            streams.insert(handle.clone(), Box::new(iter));
2292        }
2293        handle
2294    }
2295}
2296
2297/// Build the runtime representation of a `Stream[T]` value:
2298/// `Variant("__StreamHandle", [Str(handle_id)])`. The opaque tag is
2299/// prefixed with `__` so it can't collide with a user-declared
2300/// variant.
2301fn stream_handle_value(handle: String) -> Value {
2302    Value::Variant {
2303        name: "__StreamHandle".into(),
2304        args: vec![Value::Str(handle)],
2305    }
2306}
2307
2308/// Inverse of [`stream_handle_value`] — extract the handle id from
2309/// a Stream value, or `None` if the input doesn't have the
2310/// expected shape.
2311fn stream_handle_id(v: &Value) -> Option<String> {
2312    match v {
2313        Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
2314            Some(Value::Str(h)) => Some(h.clone()),
2315            _ => None,
2316        },
2317        _ => None,
2318    }
2319}
2320
2321/// Implementation of `agent.local_complete(prompt)` (#196).
2322/// Hits Ollama (or any compatible HTTP service via `OLLAMA_HOST`)
2323/// and returns the completion text. Override at the
2324/// `EffectHandler` layer if you need a different transport.
2325fn dispatch_llm_local(args: Vec<Value>) -> Value {
2326    let prompt = match args.first() {
2327        Some(Value::Str(s)) => s.clone(),
2328        _ => return err(Value::Str(
2329            "agent.local_complete(prompt): prompt must be Str".into())),
2330    };
2331    match crate::llm::local_complete(&prompt) {
2332        Ok(text) => ok(Value::Str(text)),
2333        Err(e) => err(Value::Str(e)),
2334    }
2335}
2336
2337/// Implementation of `agent.cloud_complete(prompt)` (#196).
2338/// Hits OpenAI's chat-completions API (or any compatible
2339/// service via `OPENAI_BASE_URL`) and returns the assistant
2340/// message. Requires `OPENAI_API_KEY`. Override at the
2341/// `EffectHandler` layer for custom auth, batching, or other
2342/// providers.
2343fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
2344    let prompt = match args.first() {
2345        Some(Value::Str(s)) => s.clone(),
2346        _ => return err(Value::Str(
2347            "agent.cloud_complete(prompt): prompt must be Str".into())),
2348    };
2349    match crate::llm::cloud_complete(&prompt) {
2350        Ok(text) => ok(Value::Str(text)),
2351        Err(e) => err(Value::Str(e)),
2352    }
2353}
2354
2355fn some(v: Value) -> Value {
2356    Value::Variant { name: "Some".into(), args: vec![v] }
2357}
2358fn none() -> Value {
2359    Value::Variant { name: "None".into(), args: vec![] }
2360}
2361
2362fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
2363    match v {
2364        Some(Value::Bytes(b)) => Ok(b),
2365        Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
2366        None => Err("missing argument".into()),
2367    }
2368}
2369
2370fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
2371    match v {
2372        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2373        Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
2374        None => Err("missing Kv argument".into()),
2375    }
2376}
2377
2378fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
2379    match v {
2380        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2381        Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
2382        None => Err("missing Db argument".into()),
2383    }
2384}
2385
2386#[allow(dead_code)]
2387fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
2388    match v {
2389        Some(Value::List(items)) => items.iter().map(|x| match x {
2390            Value::Str(s) => Ok(s.clone()),
2391            other => Err(format!("expected List[Str] element, got {other:?}")),
2392        }).collect(),
2393        Some(other) => Err(format!("expected List[Str], got {other:?}")),
2394        None => Err("missing List[Str] argument".into()),
2395    }
2396}
2397
2398/// Convert a `List[SqlParam]` value to driver-neutral `SqlParamValue`s.
2399/// SqlParam = PStr(Str) | PInt(Int) | PFloat(Float) | PBool(Bool) | PNull
2400fn expect_sql_params(v: Option<&Value>) -> Result<Vec<SqlParamValue>, String> {
2401    let items = match v {
2402        Some(Value::List(xs)) => xs,
2403        Some(other) => return Err(format!("expected List[SqlParam], got {other:?}")),
2404        None => return Err("missing params argument".into()),
2405    };
2406    items.iter().map(|item| {
2407        match item {
2408            Value::Variant { name, args } => match name.as_str() {
2409                "PStr"   => match args.first() {
2410                    Some(Value::Str(s)) => Ok(SqlParamValue::Text(s.clone())),
2411                    _ => Err("PStr requires a Str argument".into()),
2412                },
2413                "PInt"   => match args.first() {
2414                    Some(Value::Int(n)) => Ok(SqlParamValue::Integer(*n)),
2415                    _ => Err("PInt requires an Int argument".into()),
2416                },
2417                "PFloat" => match args.first() {
2418                    Some(Value::Float(f)) => Ok(SqlParamValue::Real(*f)),
2419                    _ => Err("PFloat requires a Float argument".into()),
2420                },
2421                "PBool"  => match args.first() {
2422                    Some(Value::Bool(b)) => Ok(SqlParamValue::Bool(*b)),
2423                    _ => Err("PBool requires a Bool argument".into()),
2424                },
2425                "PNull"  => Ok(SqlParamValue::Null),
2426                other    => Err(format!("unknown SqlParam constructor `{other}`")),
2427            },
2428            // Backward-compat: bare strings are accepted as PStr.
2429            Value::Str(s) => Ok(SqlParamValue::Text(s.clone())),
2430            other => Err(format!("expected SqlParam variant, got {other:?}")),
2431        }
2432    }).collect()
2433}
2434
2435/// Convert `SqlParamValue`s to rusqlite-typed values for SQLite binding.
2436fn sqlite_params(params: &[SqlParamValue]) -> Vec<rusqlite::types::Value> {
2437    params.iter().map(|p| match p {
2438        SqlParamValue::Text(s)    => rusqlite::types::Value::Text(s.clone()),
2439        SqlParamValue::Integer(n) => rusqlite::types::Value::Integer(*n),
2440        SqlParamValue::Real(f)    => rusqlite::types::Value::Real(*f),
2441        SqlParamValue::Bool(b)    => rusqlite::types::Value::Integer(*b as i64),
2442        SqlParamValue::Null       => rusqlite::types::Value::Null,
2443    }).collect()
2444}
2445
2446/// Box `SqlParamValue`s as `dyn ToSql + Sync` for Postgres binding.
2447fn pg_param_refs(params: &[SqlParamValue]) -> Vec<Box<dyn postgres::types::ToSql + Sync>> {
2448    params.iter().map(|p| -> Box<dyn postgres::types::ToSql + Sync> {
2449        match p {
2450            SqlParamValue::Text(s)    => Box::new(s.clone()),
2451            SqlParamValue::Integer(n) => Box::new(*n),
2452            SqlParamValue::Real(f)    => Box::new(*f),
2453            SqlParamValue::Bool(b)    => Box::new(*b),
2454            SqlParamValue::Null       => Box::new(Option::<String>::None),
2455        }
2456    }).collect()
2457}
2458
2459/// Run a statement on SQLite and pack rows into `Value::List(Value::Record(...))`.
2460fn sql_run_query_sqlite(
2461    conn: &rusqlite::Connection,
2462    stmt_str: &str,
2463    params: &[SqlParamValue],
2464) -> Value {
2465    let mut stmt = match conn.prepare(stmt_str) {
2466        Ok(s)  => s,
2467        Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2468    };
2469    let column_count = stmt.column_count();
2470    let column_names: Vec<String> = (0..column_count)
2471        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
2472        .collect();
2473    let bound = sqlite_params(params);
2474    let bind: Vec<&dyn rusqlite::ToSql> = bound.iter()
2475        .map(|p| p as &dyn rusqlite::ToSql)
2476        .collect();
2477    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
2478        Ok(r)  => r,
2479        Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2480    };
2481    let mut out: Vec<Value> = Vec::new();
2482    loop {
2483        let row = match rows.next() {
2484            Ok(Some(r)) => r,
2485            Ok(None)    => break,
2486            Err(e)      => return err(sqlite_err_to_sql_error(e, "sql.query")),
2487        };
2488        let mut rec = indexmap::IndexMap::new();
2489        for (i, name) in column_names.iter().enumerate() {
2490            let cell = match row.get_ref(i) {
2491                Ok(c)  => sql_value_ref_to_lex(c),
2492                Err(e) => return err(sqlite_err_to_sql_error(e, &format!("sql.query: column {i}"))),
2493            };
2494            rec.insert(name.clone(), cell);
2495        }
2496        out.push(Value::Record(rec));
2497    }
2498    ok(Value::List(out))
2499}
2500
2501/// Run a statement on Postgres and pack rows into `Value::List(Value::Record(...))`.
2502fn sql_run_query_pg(
2503    client: &mut postgres::Client,
2504    stmt_str: &str,
2505    params: &[SqlParamValue],
2506) -> Value {
2507    let pg = pg_param_refs(params);
2508    let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
2509        pg.iter().map(|b| b.as_ref()).collect();
2510    let rows = match client.query(stmt_str, &refs) {
2511        Ok(r)  => r,
2512        Err(e) => return err(pg_err_to_sql_error(e, "sql.query")),
2513    };
2514    let out: Vec<Value> = rows.iter().map(|row| {
2515        Value::Record(pg_row_to_lex_record(row))
2516    }).collect();
2517    ok(Value::List(out))
2518}
2519
2520/// Convert a Postgres row to a Lex record, mapping column types to Lex values.
2521fn pg_row_to_lex_record(row: &postgres::Row) -> indexmap::IndexMap<String, Value> {
2522    use postgres::types::Type;
2523    let mut rec = indexmap::IndexMap::new();
2524    for (i, col) in row.columns().iter().enumerate() {
2525        let ty = col.type_();
2526        let val = if *ty == Type::INT2 || *ty == Type::INT4 || *ty == Type::INT8 {
2527            row.get::<_, Option<i64>>(i).map(Value::Int).unwrap_or(Value::Unit)
2528        } else if *ty == Type::FLOAT4 || *ty == Type::FLOAT8 {
2529            row.get::<_, Option<f64>>(i).map(Value::Float).unwrap_or(Value::Unit)
2530        } else if *ty == Type::BOOL {
2531            row.get::<_, Option<bool>>(i).map(Value::Bool).unwrap_or(Value::Unit)
2532        } else if *ty == Type::BYTEA {
2533            row.get::<_, Option<Vec<u8>>>(i).map(Value::Bytes).unwrap_or(Value::Unit)
2534        } else {
2535            row.get::<_, Option<String>>(i).map(Value::Str).unwrap_or(Value::Unit)
2536        };
2537        rec.insert(col.name().to_string(), val);
2538    }
2539    rec
2540}
2541
2542/// Extract a column value from a row record by name, returning `Option[X]`.
2543fn sql_get_col<F>(args: &[Value], convert: F) -> Result<Value, String>
2544where
2545    F: Fn(&Value) -> Option<Value>,
2546{
2547    let row = args.first().ok_or("sql.get_*: missing row argument")?;
2548    let col = match args.get(1) {
2549        Some(Value::Str(s)) => s.as_str(),
2550        Some(other) => return Err(format!("sql.get_*: column name must be Str, got {other:?}")),
2551        None => return Err("sql.get_*: missing column name argument".into()),
2552    };
2553    let cell = match row {
2554        Value::Record(rec) => rec.get(col).cloned(),
2555        other => return Err(format!("sql.get_*: row must be a Record, got {other:?}")),
2556    };
2557    Ok(match cell.and_then(|v| convert(&v)) {
2558        Some(v) => Value::Variant { name: "Some".into(), args: vec![v] },
2559        None    => Value::Variant { name: "None".into(), args: vec![] },
2560    })
2561}
2562
2563fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
2564    use rusqlite::types::ValueRef;
2565    match v {
2566        ValueRef::Null       => Value::Unit,
2567        ValueRef::Integer(n) => Value::Int(n),
2568        ValueRef::Real(f)    => Value::Float(f),
2569        ValueRef::Text(s)    => Value::Str(String::from_utf8_lossy(s).into_owned()),
2570        ValueRef::Blob(b)    => Value::Bytes(b.to_vec()),
2571    }
2572}
2573
2574// -- log state (process-wide; configurable via log.set_*) --
2575
2576#[derive(Clone, Copy, PartialEq, PartialOrd)]
2577enum LogLevel { Debug, Info, Warn, Error }
2578
2579#[derive(Clone, Copy, PartialEq)]
2580enum LogFormat { Text, Json }
2581
2582#[derive(Clone)]
2583enum LogSink {
2584    Stderr,
2585    File(std::sync::Arc<Mutex<std::fs::File>>),
2586}
2587
2588struct LogState {
2589    level: LogLevel,
2590    format: LogFormat,
2591    sink: LogSink,
2592}
2593
2594fn log_state() -> &'static Mutex<LogState> {
2595    static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
2596    STATE.get_or_init(|| Mutex::new(LogState {
2597        level: LogLevel::Info,
2598        format: LogFormat::Text,
2599        sink: LogSink::Stderr,
2600    }))
2601}
2602
2603fn parse_log_level(s: &str) -> Option<LogLevel> {
2604    match s {
2605        "debug" => Some(LogLevel::Debug),
2606        "info" => Some(LogLevel::Info),
2607        "warn" => Some(LogLevel::Warn),
2608        "error" => Some(LogLevel::Error),
2609        _ => None,
2610    }
2611}
2612
2613fn level_label(l: LogLevel) -> &'static str {
2614    match l {
2615        LogLevel::Debug => "debug",
2616        LogLevel::Info => "info",
2617        LogLevel::Warn => "warn",
2618        LogLevel::Error => "error",
2619    }
2620}
2621
2622fn emit_log(level: LogLevel, msg: &str) {
2623    let state = log_state().lock().unwrap();
2624    if level < state.level {
2625        return;
2626    }
2627    let ts = chrono::Utc::now().to_rfc3339();
2628    let line = match state.format {
2629        LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
2630        LogFormat::Json => {
2631            // Hand-rolled JSON to avoid pulling serde_json into the
2632            // hot path; msg gets minimal escaping (the four common
2633            // cases that break a JSON line).
2634            let escaped = msg
2635                .replace('\\', "\\\\")
2636                .replace('"',  "\\\"")
2637                .replace('\n', "\\n")
2638                .replace('\r', "\\r");
2639            format!(
2640                "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
2641                level_label(level),
2642            )
2643        }
2644    };
2645    let sink = state.sink.clone();
2646    drop(state);
2647    match sink {
2648        LogSink::Stderr => {
2649            use std::io::Write;
2650            let _ = std::io::stderr().write_all(line.as_bytes());
2651        }
2652        LogSink::File(f) => {
2653            use std::io::Write;
2654            if let Ok(mut g) = f.lock() {
2655                let _ = g.write_all(line.as_bytes());
2656            }
2657        }
2658    }
2659}
2660
2661pub(crate) struct ProcessState {
2662    child: std::process::Child,
2663    stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
2664    stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
2665}
2666
2667/// Process-wide registry of live `process.spawn` handles. Capped at
2668/// [`MAX_PROCESS_HANDLES`] to bound long-running programs that spawn
2669/// many short-lived children: on each `spawn` past the cap, the
2670/// least-recently-used entry is dropped (which `Drop`s its
2671/// `ProcessState`, leaving the child orphaned but the registry
2672/// bounded). `process.wait` also drops the entry on completion since
2673/// the handle becomes terminal once the child exits.
2674///
2675/// Each entry is wrapped in `Arc<Mutex<ProcessState>>` so the global
2676/// lookup mutex is held only briefly during dispatch — once we have
2677/// the per-handle `Arc`, the global lock is released and the slow
2678/// op (`wait`, `read_*_line`) only contends on its own handle's
2679/// mutex. Reads on different handles no longer block each other.
2680fn process_registry() -> &'static Mutex<ProcessRegistry> {
2681    static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
2682    REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
2683}
2684
2685const MAX_PROCESS_HANDLES: usize = 256;
2686
2687type SharedProcessState = Arc<Mutex<ProcessState>>;
2688
2689pub(crate) struct ProcessRegistry {
2690    entries: indexmap::IndexMap<u64, SharedProcessState>,
2691    cap: usize,
2692}
2693
2694impl ProcessRegistry {
2695    pub(crate) fn with_capacity(cap: usize) -> Self {
2696        Self { entries: indexmap::IndexMap::new(), cap }
2697    }
2698
2699    /// Insert a freshly-spawned child. If at cap, evict the LRU entry
2700    /// first; the dropped `ProcessState`'s child stays alive (orphaned)
2701    /// but its file descriptors are released.
2702    pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
2703        if self.entries.len() >= self.cap {
2704            self.entries.shift_remove_index(0);
2705        }
2706        self.entries.insert(handle, Arc::new(Mutex::new(state)));
2707    }
2708
2709    /// Look up a handle, marking it most-recently-used on hit. Returns
2710    /// a clone of the shared `Arc` — callers should release the global
2711    /// registry lock before locking the per-handle mutex.
2712    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
2713        let idx = self.entries.get_index_of(&handle)?;
2714        self.entries.move_index(idx, self.entries.len() - 1);
2715        self.entries.get(&handle).cloned()
2716    }
2717
2718    /// Drop the registry entry. The underlying `Arc` may outlive the
2719    /// removal if another op still holds it; that's intentional — the
2720    /// in-flight op finishes against the existing `ProcessState`, and
2721    /// only fresh lookups start failing.
2722    pub(crate) fn remove(&mut self, handle: u64) {
2723        self.entries.shift_remove(&handle);
2724    }
2725
2726    #[cfg(test)]
2727    pub(crate) fn len(&self) -> usize { self.entries.len() }
2728}
2729
2730fn next_process_handle() -> u64 {
2731    static COUNTER: AtomicU64 = AtomicU64::new(1);
2732    COUNTER.fetch_add(1, Ordering::SeqCst)
2733}
2734
2735#[cfg(all(test, unix))]
2736mod process_registry_tests {
2737    use super::{ProcessRegistry, ProcessState};
2738
2739    /// Spawn a trivial short-lived child for use as registry payload.
2740    /// `true` exits immediately — we don't actually run the child for
2741    /// real, we just need a valid `std::process::Child`.
2742    fn fresh_state() -> ProcessState {
2743        let child = std::process::Command::new("true")
2744            .stdout(std::process::Stdio::null())
2745            .stderr(std::process::Stdio::null())
2746            .spawn()
2747            .expect("spawn `true`");
2748        ProcessState { child, stdout: None, stderr: None }
2749    }
2750
2751    #[test]
2752    fn insert_and_get_round_trip() {
2753        let mut r = ProcessRegistry::with_capacity(4);
2754        r.insert(1, fresh_state());
2755        assert!(r.touch_get(1).is_some());
2756        assert!(r.touch_get(2).is_none());
2757    }
2758
2759    #[test]
2760    fn touch_get_returns_distinct_arcs_for_distinct_handles() {
2761        let mut r = ProcessRegistry::with_capacity(4);
2762        r.insert(1, fresh_state());
2763        r.insert(2, fresh_state());
2764        let a = r.touch_get(1).unwrap();
2765        let b = r.touch_get(2).unwrap();
2766        // Different Arcs — pointer-equality check.
2767        assert!(!std::sync::Arc::ptr_eq(&a, &b));
2768    }
2769
2770    #[test]
2771    fn cap_evicts_lru_on_overflow() {
2772        let mut r = ProcessRegistry::with_capacity(2);
2773        r.insert(1, fresh_state());
2774        r.insert(2, fresh_state());
2775        let _ = r.touch_get(1);
2776        r.insert(3, fresh_state());
2777        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2778        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2779        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2780        assert_eq!(r.len(), 2);
2781    }
2782
2783    #[test]
2784    fn cap_with_no_touches_evicts_in_insertion_order() {
2785        let mut r = ProcessRegistry::with_capacity(2);
2786        r.insert(10, fresh_state());
2787        r.insert(20, fresh_state());
2788        r.insert(30, fresh_state());
2789        assert!(r.touch_get(10).is_none());
2790        assert!(r.touch_get(20).is_some());
2791        assert!(r.touch_get(30).is_some());
2792    }
2793
2794    #[test]
2795    fn remove_drops_entry() {
2796        let mut r = ProcessRegistry::with_capacity(4);
2797        r.insert(1, fresh_state());
2798        r.remove(1);
2799        assert!(r.touch_get(1).is_none());
2800        assert_eq!(r.len(), 0);
2801    }
2802
2803    #[test]
2804    fn many_inserts_stay_bounded_at_cap() {
2805        let cap = 8;
2806        let mut r = ProcessRegistry::with_capacity(cap);
2807        for i in 0..(cap as u64 * 3) {
2808            r.insert(i, fresh_state());
2809            assert!(r.len() <= cap);
2810        }
2811        assert_eq!(r.len(), cap);
2812    }
2813
2814    #[test]
2815    fn outstanding_arc_outlives_remove() {
2816        // Holding the per-handle Arc while another op removes the
2817        // entry must not invalidate the in-flight op. Mirrors the
2818        // wait-completes-then-removes pattern.
2819        let mut r = ProcessRegistry::with_capacity(4);
2820        r.insert(1, fresh_state());
2821        let arc = r.touch_get(1).expect("entry exists");
2822        r.remove(1);
2823        // Registry forgot about it, but the Arc still works.
2824        assert!(r.touch_get(1).is_none());
2825        let _state = arc.lock().unwrap();
2826    }
2827}
2828
2829fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
2830    match v {
2831        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2832        Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
2833        None => Err("missing ProcessHandle argument".into()),
2834    }
2835}
2836
2837/// Process-wide registry of open `Kv` handles. Each `kv.open` allocates
2838/// a new u64 handle via [`next_kv_handle`] and stores the `sled::Db`
2839/// here; subsequent ops fetch by handle. `kv.close` removes the entry.
2840///
2841/// Capped at [`MAX_KV_HANDLES`] to prevent leaks from long-running
2842/// programs that open many short-lived stores without calling
2843/// `kv.close`. On insert at cap, the least-recently-used entry is
2844/// dropped (closing its `sled::Db`); subsequent ops on the evicted
2845/// handle return the standard "closed or unknown Kv handle" error.
2846/// Any access (`get`, `put`, `delete`, `contains`, `list_prefix`)
2847/// touches the LRU order.
2848fn kv_registry() -> &'static Mutex<KvRegistry> {
2849    static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
2850    REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
2851}
2852
2853/// Maximum number of `kv.open` handles kept alive at once. Past this
2854/// cap, the least-recently-used handle is evicted on each new open.
2855/// Sized so that pathological "open and forget" programs are bounded
2856/// without breaking real-world programs that intentionally keep one or
2857/// two long-lived stores open.
2858const MAX_KV_HANDLES: usize = 256;
2859
2860/// LRU-bounded set of open `sled::Db` instances keyed by `u64` handle.
2861/// Built on `IndexMap` for O(1) insert / remove / lookup with
2862/// insertion-order traversal — touching an entry just shift-moves it
2863/// to the back, evictions pop from the front.
2864pub(crate) struct KvRegistry {
2865    entries: indexmap::IndexMap<u64, sled::Db>,
2866    cap: usize,
2867}
2868
2869impl KvRegistry {
2870    pub(crate) fn with_capacity(cap: usize) -> Self {
2871        Self { entries: indexmap::IndexMap::new(), cap }
2872    }
2873
2874    /// Insert a freshly-opened db. If we're already at cap, evict the
2875    /// LRU entry first; the dropped `sled::Db` closes its files.
2876    pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
2877        if self.entries.len() >= self.cap {
2878            self.entries.shift_remove_index(0);
2879        }
2880        self.entries.insert(handle, db);
2881    }
2882
2883    /// Look up a handle, marking it most-recently-used on hit.
2884    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
2885        let idx = self.entries.get_index_of(&handle)?;
2886        self.entries.move_index(idx, self.entries.len() - 1);
2887        self.entries.get(&handle)
2888    }
2889
2890    /// Explicit `kv.close`: drop the handle if present.
2891    pub(crate) fn remove(&mut self, handle: u64) {
2892        self.entries.shift_remove(&handle);
2893    }
2894
2895    #[cfg(test)]
2896    pub(crate) fn len(&self) -> usize { self.entries.len() }
2897}
2898
2899fn next_kv_handle() -> u64 {
2900    static COUNTER: AtomicU64 = AtomicU64::new(1);
2901    COUNTER.fetch_add(1, Ordering::SeqCst)
2902}
2903
2904/// Process-wide registry of open `Db` handles. Same shape as the kv
2905/// and process registries: per-handle `Arc<Mutex<…>>` so dispatch
2906/// only briefly holds the global lock and ops on different
2907/// connections don't serialize. LRU-bounded at
2908/// [`MAX_SQL_HANDLES`] to avoid leaks from long-running programs
2909/// that open many short-lived databases.
2910fn sql_registry() -> &'static Mutex<SqlRegistry> {
2911    static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
2912    REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
2913}
2914
2915const MAX_SQL_HANDLES: usize = 256;
2916
2917// ── Streaming cursors (#379) ─────────────────────────────────────────
2918//
2919// `sql.query_iter[T]` opens a *server-side* cursor and returns an
2920// `Iter[T]` backed by a producer thread streaming rows through a
2921// bounded mpsc channel. The bytecode `iter.next` op dispatches on the
2922// `__IterCursor(handle)` variant tag and effect-calls
2923// `sql.cursor_next(handle)` to pull one row at a time.
2924//
2925// Producer-thread semantics: while the cursor is live, the producer
2926// holds the underlying SQL connection's `Arc<Mutex<SqlConn>>` lock.
2927// Other ops on the same Db handle block until the cursor is drained
2928// or evicted. This matches every server-side cursor protocol
2929// (sqlite's `sqlite3_step`, Postgres `DECLARE/FETCH`) — neither
2930// driver supports concurrent statements on a single connection.
2931//
2932// Channel capacity: 64 rows. Producer blocks at 64-row backlog,
2933// keeping resident memory bounded regardless of result-set size.
2934// Consumer disconnect (Receiver dropped) causes the next send to
2935// fail, the producer exits, drops the prepared statement, and
2936// releases the SqlConn lock — so closing a cursor is just "stop
2937// calling next and let the receiver go out of scope."
2938
2939const CURSOR_CHANNEL_CAPACITY: usize = 64;
2940const MAX_CURSOR_HANDLES: usize = 256;
2941
2942type CursorReceiver = std::sync::mpsc::Receiver<Result<Value, String>>;
2943
2944pub(crate) struct CursorRegistry {
2945    /// Each cursor's receiver lives behind its own Mutex so multiple
2946    /// `sql.cursor_next` calls on the same cursor serialize correctly.
2947    /// The outer `Arc` lets the global registry lock be released
2948    /// before blocking on `recv()`.
2949    entries: indexmap::IndexMap<u64, Arc<Mutex<CursorReceiver>>>,
2950    cap: usize,
2951}
2952
2953impl CursorRegistry {
2954    pub(crate) fn with_capacity(cap: usize) -> Self {
2955        Self { entries: indexmap::IndexMap::new(), cap }
2956    }
2957
2958    pub(crate) fn insert(&mut self, handle: u64, rx: CursorReceiver) {
2959        if self.entries.len() >= self.cap {
2960            self.entries.shift_remove_index(0);
2961        }
2962        self.entries.insert(handle, Arc::new(Mutex::new(rx)));
2963    }
2964
2965    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<Arc<Mutex<CursorReceiver>>> {
2966        let idx = self.entries.get_index_of(&handle)?;
2967        self.entries.move_index(idx, self.entries.len() - 1);
2968        self.entries.get(&handle).cloned()
2969    }
2970
2971    pub(crate) fn remove(&mut self, handle: u64) {
2972        self.entries.shift_remove(&handle);
2973    }
2974}
2975
2976fn cursor_registry() -> &'static Mutex<CursorRegistry> {
2977    static REGISTRY: OnceLock<Mutex<CursorRegistry>> = OnceLock::new();
2978    REGISTRY.get_or_init(|| Mutex::new(CursorRegistry::with_capacity(MAX_CURSOR_HANDLES)))
2979}
2980
2981fn next_cursor_handle() -> u64 {
2982    static COUNTER: AtomicU64 = AtomicU64::new(1);
2983    COUNTER.fetch_add(1, Ordering::SeqCst)
2984}
2985
2986/// SQLite cursor producer: locks the conn, prepares the statement,
2987/// walks rows, ships each to the consumer through `sender`. Exits on
2988/// row exhaustion, consumer disconnect, or first error. The lock is
2989/// released when the thread function returns (statement dropped first
2990/// to satisfy rusqlite's borrow).
2991fn sqlite_cursor_producer(
2992    conn_arc: Arc<Mutex<SqlConn>>,
2993    stmt_str: String,
2994    params: Vec<SqlParamValue>,
2995    sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
2996) {
2997    let mut conn_guard = match conn_arc.lock() {
2998        Ok(g) => g,
2999        Err(p) => p.into_inner(),
3000    };
3001    let SqlConn::Sqlite(c) = &mut *conn_guard else {
3002        let _ = sender.send(Err("sqlite_cursor_producer called on non-sqlite conn".into()));
3003        return;
3004    };
3005    let mut stmt = match c.prepare(&stmt_str) {
3006        Ok(s) => s,
3007        Err(e) => { let _ = sender.send(Err(format!("prepare: {e}"))); return; }
3008    };
3009    let column_count = stmt.column_count();
3010    let column_names: Vec<String> = (0..column_count)
3011        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
3012        .collect();
3013    let bound = sqlite_params(&params);
3014    let bind: Vec<&dyn rusqlite::ToSql> =
3015        bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
3016    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
3017        Ok(r) => r,
3018        Err(e) => { let _ = sender.send(Err(format!("query: {e}"))); return; }
3019    };
3020    loop {
3021        match rows.next() {
3022            Ok(None) => break,
3023            Err(e) => {
3024                let _ = sender.send(Err(format!("row: {e}")));
3025                break;
3026            }
3027            Ok(Some(row)) => {
3028                let mut rec = indexmap::IndexMap::new();
3029                for (i, name) in column_names.iter().enumerate() {
3030                    let val = match row.get_ref(i) {
3031                        Ok(vr) => sql_value_ref_to_lex(vr),
3032                        Err(_) => Value::Unit,
3033                    };
3034                    rec.insert(name.clone(), val);
3035                }
3036                if sender.send(Ok(Value::Record(rec))).is_err() {
3037                    break;
3038                }
3039            }
3040        }
3041    }
3042}
3043
3044/// Postgres cursor producer: opens a transaction + named cursor,
3045/// fetches rows in batches, ships each one through `sender`. Closes
3046/// the cursor and commits the transaction on exit.
3047fn pg_cursor_producer(
3048    conn_arc: Arc<Mutex<SqlConn>>,
3049    stmt_str: String,
3050    params: Vec<SqlParamValue>,
3051    sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
3052) {
3053    let mut conn_guard = match conn_arc.lock() {
3054        Ok(g) => g,
3055        Err(p) => p.into_inner(),
3056    };
3057    let SqlConn::Postgres(c) = &mut *conn_guard else {
3058        let _ = sender.send(Err("pg_cursor_producer called on non-postgres conn".into()));
3059        return;
3060    };
3061    let pg = pg_param_refs(&params);
3062    let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
3063        pg.iter().map(|b| b.as_ref()).collect();
3064    let mut tx = match c.transaction() {
3065        Ok(t) => t,
3066        Err(e) => { let _ = sender.send(Err(format!("begin: {e}"))); return; }
3067    };
3068    // Use a uniquely-named cursor so concurrent producers on
3069    // distinct Db handles don't collide on the cursor namespace.
3070    let cur_name = format!("__lex_cur_{}", next_cursor_handle());
3071    if let Err(e) = tx.execute(
3072        &format!("DECLARE \"{cur_name}\" NO SCROLL CURSOR FOR {stmt_str}"),
3073        &refs,
3074    ) {
3075        let _ = sender.send(Err(format!("declare: {e}")));
3076        return;
3077    }
3078    let fetch_sql = format!("FETCH 64 FROM \"{cur_name}\"");
3079    'outer: loop {
3080        let batch = match tx.query(&fetch_sql, &[]) {
3081            Ok(r) => r,
3082            Err(e) => { let _ = sender.send(Err(format!("fetch: {e}"))); break; }
3083        };
3084        if batch.is_empty() {
3085            break;
3086        }
3087        for row in batch.iter() {
3088            let rec = pg_row_to_lex_record(row);
3089            if sender.send(Ok(Value::Record(rec))).is_err() {
3090                break 'outer;
3091            }
3092        }
3093    }
3094    let _ = tx.execute(&format!("CLOSE \"{cur_name}\""), &[]);
3095    let _ = tx.commit();
3096}
3097
3098/// Driver-neutral SQL parameter value shared between SQLite and Postgres paths.
3099#[derive(Debug, Clone)]
3100enum SqlParamValue {
3101    Text(String),
3102    Integer(i64),
3103    Real(f64),
3104    Bool(bool),
3105    Null,
3106}
3107
3108/// Abstraction over a SQLite connection or a Postgres client.
3109pub(crate) enum SqlConn {
3110    Sqlite(rusqlite::Connection),
3111    Postgres(postgres::Client),
3112}
3113
3114type SharedConn = Arc<Mutex<SqlConn>>;
3115
3116pub(crate) struct SqlRegistry {
3117    entries: indexmap::IndexMap<u64, SharedConn>,
3118    cap: usize,
3119}
3120
3121impl SqlRegistry {
3122    pub(crate) fn with_capacity(cap: usize) -> Self {
3123        Self { entries: indexmap::IndexMap::new(), cap }
3124    }
3125
3126    pub(crate) fn insert(&mut self, handle: u64, conn: SqlConn) {
3127        if self.entries.len() >= self.cap {
3128            self.entries.shift_remove_index(0);
3129        }
3130        self.entries.insert(handle, Arc::new(Mutex::new(conn)));
3131    }
3132
3133    /// Look up a handle, marking it MRU on hit. Returns a clone of
3134    /// the shared `Arc` so callers release the global registry
3135    /// lock before locking the per-handle mutex.
3136    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
3137        let idx = self.entries.get_index_of(&handle)?;
3138        self.entries.move_index(idx, self.entries.len() - 1);
3139        self.entries.get(&handle).cloned()
3140    }
3141
3142    pub(crate) fn remove(&mut self, handle: u64) {
3143        self.entries.shift_remove(&handle);
3144    }
3145
3146    #[cfg(test)]
3147    pub(crate) fn len(&self) -> usize { self.entries.len() }
3148}
3149
3150fn next_sql_handle() -> u64 {
3151    static COUNTER: AtomicU64 = AtomicU64::new(1);
3152    COUNTER.fetch_add(1, Ordering::SeqCst)
3153}
3154
3155#[cfg(test)]
3156mod sql_registry_tests {
3157    use super::{SqlConn, SqlRegistry};
3158
3159    fn fresh() -> SqlConn {
3160        SqlConn::Sqlite(rusqlite::Connection::open_in_memory().expect("open in-memory sqlite"))
3161    }
3162
3163    #[test]
3164    fn insert_and_get_round_trip() {
3165        let mut r = SqlRegistry::with_capacity(4);
3166        r.insert(1, fresh());
3167        assert!(r.touch_get(1).is_some());
3168        assert!(r.touch_get(2).is_none());
3169    }
3170
3171    #[test]
3172    fn cap_evicts_lru_on_overflow() {
3173        let mut r = SqlRegistry::with_capacity(2);
3174        r.insert(1, fresh());
3175        r.insert(2, fresh());
3176        let _ = r.touch_get(1);
3177        r.insert(3, fresh());
3178        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3179        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3180        assert!(r.touch_get(3).is_some(), "3 just inserted");
3181        assert_eq!(r.len(), 2);
3182    }
3183
3184    #[test]
3185    fn remove_drops_entry() {
3186        let mut r = SqlRegistry::with_capacity(4);
3187        r.insert(1, fresh());
3188        r.remove(1);
3189        assert!(r.touch_get(1).is_none());
3190        assert_eq!(r.len(), 0);
3191    }
3192
3193    #[test]
3194    fn many_inserts_stay_bounded_at_cap() {
3195        let cap = 8;
3196        let mut r = SqlRegistry::with_capacity(cap);
3197        for i in 0..(cap as u64 * 3) {
3198            r.insert(i, fresh());
3199            assert!(r.len() <= cap);
3200        }
3201        assert_eq!(r.len(), cap);
3202    }
3203}
3204
3205#[cfg(test)]
3206mod kv_registry_tests {
3207    use super::KvRegistry;
3208
3209    /// Spin up an isolated `sled::Db` in a temp dir. Each call gets a
3210    /// unique path so concurrent tests don't collide on the lockfile.
3211    fn fresh_db(tag: &str) -> sled::Db {
3212        let dir = std::env::temp_dir().join(format!(
3213            "lex-kv-reg-{}-{}-{}",
3214            std::process::id(),
3215            tag,
3216            std::time::SystemTime::now()
3217                .duration_since(std::time::UNIX_EPOCH)
3218                .unwrap()
3219                .as_nanos()
3220        ));
3221        sled::open(&dir).expect("sled open")
3222    }
3223
3224    #[test]
3225    fn insert_and_get_round_trip() {
3226        let mut r = KvRegistry::with_capacity(4);
3227        r.insert(1, fresh_db("a"));
3228        assert!(r.touch_get(1).is_some());
3229        assert!(r.touch_get(2).is_none());
3230    }
3231
3232    #[test]
3233    fn cap_evicts_lru_on_overflow() {
3234        // cap=2: insert 1, 2; touch 1 (now MRU); insert 3 → 2 evicted.
3235        let mut r = KvRegistry::with_capacity(2);
3236        r.insert(1, fresh_db("c1"));
3237        r.insert(2, fresh_db("c2"));
3238        let _ = r.touch_get(1);
3239        r.insert(3, fresh_db("c3"));
3240        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3241        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3242        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
3243        assert_eq!(r.len(), 2);
3244    }
3245
3246    #[test]
3247    fn cap_with_no_touches_evicts_in_insertion_order() {
3248        // cap=2: insert 1, 2, 3 with no touches → 1 evicted (FIFO).
3249        let mut r = KvRegistry::with_capacity(2);
3250        r.insert(10, fresh_db("f1"));
3251        r.insert(20, fresh_db("f2"));
3252        r.insert(30, fresh_db("f3"));
3253        assert!(r.touch_get(10).is_none());
3254        assert!(r.touch_get(20).is_some());
3255        assert!(r.touch_get(30).is_some());
3256    }
3257
3258    #[test]
3259    fn remove_drops_entry() {
3260        let mut r = KvRegistry::with_capacity(4);
3261        r.insert(1, fresh_db("r1"));
3262        r.remove(1);
3263        assert!(r.touch_get(1).is_none());
3264        assert_eq!(r.len(), 0);
3265    }
3266
3267    #[test]
3268    fn remove_unknown_handle_is_noop() {
3269        let mut r = KvRegistry::with_capacity(4);
3270        r.insert(1, fresh_db("u1"));
3271        r.remove(999);
3272        assert!(r.touch_get(1).is_some());
3273    }
3274
3275    #[test]
3276    fn many_inserts_stay_bounded_at_cap() {
3277        // Exhaust the cap to confirm the registry never grows past it,
3278        // even under sustained churn.
3279        let cap = 8;
3280        let mut r = KvRegistry::with_capacity(cap);
3281        for i in 0..(cap as u64 * 3) {
3282            r.insert(i, fresh_db(&format!("b{i}")));
3283            assert!(r.len() <= cap);
3284        }
3285        assert_eq!(r.len(), cap);
3286    }
3287}