Skip to main content

lex_runtime/
handler.rs

1//! Native effect handlers, dispatched at runtime through the VM's
2//! `EffectHandler` trait. The handler also re-checks the runtime policy
3//! per spec §7.4 (the static check is necessary but not sufficient: a fn
4//! declared `[fs_read("/data")]` that's allowed at startup still has to
5//! pass the path check at the point of dispatch).
6
7use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::{call_pure_builtin, is_pure_call};
16use crate::policy::Policy;
17
18/// Output sink used by `io.print`. Tests inject a buffer; production prints
19/// to stdout.
20pub trait IoSink: Send {
21    fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26    fn print_line(&mut self, s: &str) {
27        println!("{s}");
28    }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34    fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37/// `agent.cloud_stream` registry: per-handle producer iterators
38/// keyed by opaque handle id (#305 slice 3).
39pub type StreamRegistry =
40    std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43    policy: Policy,
44    pub sink: Box<dyn IoSink>,
45    /// Optional read root for `io.read` — when set, `io.read("p")` resolves
46    /// to `read_root.join(p)`. Lets tests run without touching the real fs.
47    pub read_root: Option<PathBuf>,
48    /// Per-run budget pool (#225). `Arc<AtomicU64>` so parallel
49    /// branches share one counter without locking. Initialized to
50    /// the policy ceiling at handler construction; each call to a
51    /// function with declared `[budget(N)]` deducts N atomically
52    /// via `note_call_budget`. Cloning the handler is intentional
53    /// for net.serve / chat handlers — they share the same pool.
54    pub budget_remaining: Arc<AtomicU64>,
55    /// The original ceiling that `budget_remaining` started at, kept
56    /// for diagnostics so a `BudgetExceeded` error can report
57    /// `(used, ceiling)` rather than just "exceeded by N".
58    pub budget_ceiling: Option<u64>,
59    /// Shared reference to the program, needed by `net.serve` so the
60    /// handler can spin up fresh VMs to dispatch incoming requests.
61    /// `None` if the handler was constructed without a program.
62    pub program: Option<Arc<Program>>,
63    /// Chat registry; populated by `net.serve_ws`'s per-message
64    /// dispatch so `chat.broadcast` / `chat.send` work from inside
65    /// a handler invocation.
66    pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67    /// LRU cache of `agent.call_mcp` clients keyed by the
68    /// command-line string (#197). Avoids spawn-per-call cost
69    /// when an agent invokes the same MCP server in tight loops.
70    /// Capped — when the cache is full, the least-recently-used
71    /// entry is dropped (its subprocess is reaped on Drop).
72    pub mcp_clients: crate::mcp_client::McpClientCache,
73    /// Stream registry for `agent.cloud_stream` / `stream.next` /
74    /// `stream.collect` (#305 slice 3). Keyed by an opaque handle
75    /// id; values are the producer iterators. Wrapped in
76    /// `Arc<Mutex<…>>` so par_map workers can share the same
77    /// stream pool (when slice-2's per-worker handler split chains
78    /// the registry through).
79    pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80    /// Monotonic counter for handing out fresh stream handle ids.
81    pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85    pub fn new(policy: Policy) -> Self {
86        // If the caller supplied a ceiling, the pool starts at that
87        // ceiling and counts down. No ceiling = `u64::MAX` so calls
88        // never refuse on budget grounds (existing behavior).
89        let ceiling = policy.budget;
90        let initial = ceiling.unwrap_or(u64::MAX);
91        Self {
92            policy,
93            sink: Box::new(StdoutSink),
94            read_root: None,
95            budget_remaining: Arc::new(AtomicU64::new(initial)),
96            budget_ceiling: ceiling,
97            program: None,
98            chat_registry: None,
99            mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100            streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101            next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102        }
103    }
104
105    pub fn with_program(mut self, program: Arc<Program>) -> Self {
106        self.program = Some(program); self
107    }
108
109    pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110        self.chat_registry = Some(registry); self
111    }
112
113    pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114        self.sink = sink; self
115    }
116
117    pub fn with_read_root(mut self, root: PathBuf) -> Self {
118        self.read_root = Some(root); self
119    }
120
121    fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122        if self.policy.allow_effects.contains(kind) {
123            Ok(())
124        } else {
125            Err(format!("effect `{kind}` not in --allow-effects"))
126        }
127    }
128
129    fn resolve_read_path(&self, p: &str) -> PathBuf {
130        match &self.read_root {
131            Some(root) => root.join(p.trim_start_matches('/')),
132            None => PathBuf::from(p),
133        }
134    }
135
136    fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137        match op {
138            "debug" | "info" | "warn" | "error" => {
139                let msg = expect_str(args.first())?;
140                let level = match op {
141                    "debug" => LogLevel::Debug,
142                    "info" => LogLevel::Info,
143                    "warn" => LogLevel::Warn,
144                    _ => LogLevel::Error,
145                };
146                emit_log(level, msg);
147                Ok(Value::Unit)
148            }
149            "set_level" => {
150                let s = expect_str(args.first())?;
151                match parse_log_level(s) {
152                    Some(l) => {
153                        log_state().lock().unwrap().level = l;
154                        Ok(ok(Value::Unit))
155                    }
156                    None => Ok(err(Value::Str(format!(
157                        "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158                }
159            }
160            "set_format" => {
161                let s = expect_str(args.first())?;
162                let fmt = match s {
163                    "text" => LogFormat::Text,
164                    "json" => LogFormat::Json,
165                    other => return Ok(err(Value::Str(format!(
166                        "log.set_format: unknown format `{other}`; expected text|json")))),
167                };
168                log_state().lock().unwrap().format = fmt;
169                Ok(ok(Value::Unit))
170            }
171            "set_sink" => {
172                let path = expect_str(args.first())?;
173                if path == "-" {
174                    log_state().lock().unwrap().sink = LogSink::Stderr;
175                    return Ok(ok(Value::Unit));
176                }
177                if let Err(e) = self.ensure_fs_write_path(path) {
178                    return Ok(err(Value::Str(e)));
179                }
180                match std::fs::OpenOptions::new()
181                    .create(true).append(true).open(path)
182                {
183                    Ok(f) => {
184                        log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185                        Ok(ok(Value::Unit))
186                    }
187                    Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188                }
189            }
190            other => Err(format!("unsupported log.{other}")),
191        }
192    }
193
194    fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195        match op {
196            "spawn" => {
197                let cmd = expect_str(args.first())?.to_string();
198                let raw_args = match args.get(1) {
199                    Some(Value::List(items)) => items.clone(),
200                    _ => return Err("process.spawn: args must be List[Str]".into()),
201                };
202                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203                    Value::Str(s) => Ok(s.clone()),
204                    other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205                }).collect();
206                let str_args = str_args?;
207                let opts = match args.get(2) {
208                    Some(Value::Record(r)) => r.clone(),
209                    _ => return Err("process.spawn: missing or invalid opts record".into()),
210                };
211
212                // Allow-list check, mirroring the existing proc.spawn.
213                if !self.policy.allow_proc.is_empty() {
214                    let basename = std::path::Path::new(&cmd)
215                        .file_name()
216                        .and_then(|s| s.to_str())
217                        .unwrap_or(&cmd);
218                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
219                        return Ok(err(Value::Str(format!(
220                            "process.spawn: `{cmd}` not in --allow-proc {:?}",
221                            self.policy.allow_proc
222                        ))));
223                    }
224                }
225
226                let mut command = std::process::Command::new(&cmd);
227                command.args(&str_args);
228                command.stdin(std::process::Stdio::piped());
229                command.stdout(std::process::Stdio::piped());
230                command.stderr(std::process::Stdio::piped());
231
232                if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233                    if name == "Some" {
234                        if let Some(Value::Str(s)) = vargs.first() {
235                            command.current_dir(s);
236                        }
237                    }
238                }
239                if let Some(Value::Map(env)) = opts.get("env") {
240                    for (k, v) in env {
241                        if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242                            command.env(ks, vs);
243                        }
244                    }
245                }
246
247                let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248                    Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249                        match vargs.first() {
250                            Some(Value::Bytes(b)) => Some(b.clone()),
251                            _ => None,
252                        }
253                    }
254                    _ => None,
255                };
256
257                let mut child = match command.spawn() {
258                    Ok(c) => c,
259                    Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260                };
261
262                if let Some(payload) = stdin_payload {
263                    if let Some(mut stdin) = child.stdin.take() {
264                        use std::io::Write;
265                        let _ = stdin.write_all(&payload);
266                        // Drop closes stdin; the child sees EOF.
267                    }
268                }
269
270                let stdout = child.stdout.take().map(std::io::BufReader::new);
271                let stderr = child.stderr.take().map(std::io::BufReader::new);
272                let handle = next_process_handle();
273                process_registry().lock().unwrap().insert(handle, ProcessState {
274                    child,
275                    stdout,
276                    stderr,
277                });
278                Ok(ok(Value::Int(handle as i64)))
279            }
280            "read_stdout_line" => Self::read_line_op(args, true),
281            "read_stderr_line" => Self::read_line_op(args, false),
282            "wait" => {
283                let h = expect_process_handle(args.first())?;
284                // Look up the per-handle Arc, then release the global
285                // lock before the (slow) wait so unrelated handles
286                // can dispatch concurrently.
287                let arc = process_registry().lock().unwrap()
288                    .touch_get(h)
289                    .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290                let status = {
291                    let mut state = arc.lock().unwrap();
292                    state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293                };
294                // Wait completion makes the handle terminal; drop it
295                // from the registry so the cap doesn't fill up with
296                // exited children.
297                process_registry().lock().unwrap().remove(h);
298                let mut rec = indexmap::IndexMap::new();
299                rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300                #[cfg(unix)]
301                {
302                    use std::os::unix::process::ExitStatusExt;
303                    rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304                }
305                #[cfg(not(unix))]
306                {
307                    rec.insert("signaled".into(), Value::Bool(false));
308                }
309                Ok(Value::Record(rec))
310            }
311            "kill" => {
312                let h = expect_process_handle(args.first())?;
313                let _signal = expect_str(args.get(1))?;
314                let arc = process_registry().lock().unwrap()
315                    .touch_get(h)
316                    .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317                let mut state = arc.lock().unwrap();
318                // Cross-platform: only `kill` (SIGKILL-equivalent on
319                // Windows). Signal-name dispatch is a v1.5 follow-up.
320                match state.child.kill() {
321                    Ok(_) => Ok(ok(Value::Unit)),
322                    Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323                }
324            }
325            "run" => {
326                let cmd = expect_str(args.first())?.to_string();
327                let raw_args = match args.get(1) {
328                    Some(Value::List(items)) => items.clone(),
329                    _ => return Err("process.run: args must be List[Str]".into()),
330                };
331                let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332                    Value::Str(s) => Ok(s.clone()),
333                    other => Err(format!("process.run: arg must be Str, got {other:?}")),
334                }).collect();
335                let str_args = str_args?;
336                if !self.policy.allow_proc.is_empty() {
337                    let basename = std::path::Path::new(&cmd)
338                        .file_name()
339                        .and_then(|s| s.to_str())
340                        .unwrap_or(&cmd);
341                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
342                        return Ok(err(Value::Str(format!(
343                            "process.run: `{cmd}` not in --allow-proc {:?}",
344                            self.policy.allow_proc
345                        ))));
346                    }
347                }
348                match std::process::Command::new(&cmd).args(&str_args).output() {
349                    Ok(o) => {
350                        let mut rec = indexmap::IndexMap::new();
351                        rec.insert("stdout".into(), Value::Str(
352                            String::from_utf8_lossy(&o.stdout).to_string()));
353                        rec.insert("stderr".into(), Value::Str(
354                            String::from_utf8_lossy(&o.stderr).to_string()));
355                        rec.insert("exit_code".into(), Value::Int(
356                            o.status.code().unwrap_or(-1) as i64));
357                        Ok(ok(Value::Record(rec)))
358                    }
359                    Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360                }
361            }
362            other => Err(format!("unsupported process.{other}")),
363        }
364    }
365
366    /// Read one line from the child's stdout (`is_stdout = true`) or
367    /// stderr. Returns `None` (Lex `Option`) at EOF; subsequent calls
368    /// keep returning `None`. Holds only the per-handle mutex during
369    /// the (potentially blocking) read, so reads on one handle don't
370    /// block reads/waits on a different handle.
371    fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372        let h = expect_process_handle(args.first())?;
373        let arc = process_registry().lock().unwrap()
374            .touch_get(h)
375            .ok_or_else(|| format!(
376                "process.read_{}_line: closed or unknown ProcessHandle",
377                if is_stdout { "stdout" } else { "stderr" }))?;
378        let mut state = arc.lock().unwrap();
379        let reader_opt = if is_stdout {
380            state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381        } else {
382            state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383        };
384        let reader = match reader_opt {
385            Some(r) => r,
386            None => return Ok(none()),
387        };
388        let mut line = String::new();
389        match reader.read_line(&mut line) {
390            Ok(0) => Ok(none()),
391            Ok(_) => {
392                if line.ends_with('\n') { line.pop(); }
393                if line.ends_with('\r') { line.pop(); }
394                Ok(some(Value::Str(line)))
395            }
396            Err(e) => Err(format!("process.read_*_line: {e}")),
397        }
398    }
399
400    fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401        match op {
402            "exists" => {
403                let path = expect_str(args.first())?.to_string();
404                if let Err(e) = self.ensure_fs_walk_path(&path) {
405                    return Ok(err(Value::Str(e)));
406                }
407                Ok(Value::Bool(std::path::Path::new(&path).exists()))
408            }
409            "is_file" => {
410                let path = expect_str(args.first())?.to_string();
411                if let Err(e) = self.ensure_fs_walk_path(&path) {
412                    return Ok(err(Value::Str(e)));
413                }
414                Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415            }
416            "is_dir" => {
417                let path = expect_str(args.first())?.to_string();
418                if let Err(e) = self.ensure_fs_walk_path(&path) {
419                    return Ok(err(Value::Str(e)));
420                }
421                Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422            }
423            "stat" => {
424                let path = expect_str(args.first())?.to_string();
425                if let Err(e) = self.ensure_fs_walk_path(&path) {
426                    return Ok(err(Value::Str(e)));
427                }
428                match std::fs::metadata(&path) {
429                    Ok(md) => {
430                        let mtime = md.modified()
431                            .ok()
432                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433                            .map(|d| d.as_secs() as i64)
434                            .unwrap_or(0);
435                        let mut rec = indexmap::IndexMap::new();
436                        rec.insert("size".into(), Value::Int(md.len() as i64));
437                        rec.insert("mtime".into(), Value::Int(mtime));
438                        rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439                        rec.insert("is_file".into(), Value::Bool(md.is_file()));
440                        Ok(ok(Value::Record(rec)))
441                    }
442                    Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443                }
444            }
445            "list_dir" => {
446                let path = expect_str(args.first())?.to_string();
447                if let Err(e) = self.ensure_fs_walk_path(&path) {
448                    return Ok(err(Value::Str(e)));
449                }
450                match std::fs::read_dir(&path) {
451                    Ok(rd) => {
452                        let mut entries: Vec<Value> = Vec::new();
453                        for ent in rd {
454                            match ent {
455                                Ok(e) => {
456                                    let p = e.path();
457                                    entries.push(Value::Str(p.to_string_lossy().into_owned()));
458                                }
459                                Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460                            }
461                        }
462                        Ok(ok(Value::List(entries)))
463                    }
464                    Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465                }
466            }
467            "walk" => {
468                let path = expect_str(args.first())?.to_string();
469                if let Err(e) = self.ensure_fs_walk_path(&path) {
470                    return Ok(err(Value::Str(e)));
471                }
472                let mut paths: Vec<Value> = Vec::new();
473                for ent in walkdir::WalkDir::new(&path) {
474                    match ent {
475                        Ok(e) => paths.push(Value::Str(
476                            e.path().to_string_lossy().into_owned())),
477                        Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478                    }
479                }
480                Ok(ok(Value::List(paths)))
481            }
482            "glob" => {
483                let pattern = expect_str(args.first())?.to_string();
484                // Glob patterns can't be path-scoped at parse time
485                // (`**/*.rs` doesn't pin a directory); we filter the
486                // per-result paths after expansion against
487                // `--allow-fs-read`.
488                let entries = match glob::glob(&pattern) {
489                    Ok(e) => e,
490                    Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491                };
492                let mut paths: Vec<Value> = Vec::new();
493                for ent in entries {
494                    match ent {
495                        Ok(p) => {
496                            let s = p.to_string_lossy().into_owned();
497                            if self.policy.allow_fs_read.is_empty()
498                                || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499                            {
500                                paths.push(Value::Str(s));
501                            }
502                        }
503                        Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504                    }
505                }
506                Ok(ok(Value::List(paths)))
507            }
508            "mkdir_p" => {
509                let path = expect_str(args.first())?.to_string();
510                if let Err(e) = self.ensure_fs_write_path(&path) {
511                    return Ok(err(Value::Str(e)));
512                }
513                match std::fs::create_dir_all(&path) {
514                    Ok(_) => Ok(ok(Value::Unit)),
515                    Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516                }
517            }
518            "remove" => {
519                let path = expect_str(args.first())?.to_string();
520                if let Err(e) = self.ensure_fs_write_path(&path) {
521                    return Ok(err(Value::Str(e)));
522                }
523                let p = std::path::Path::new(&path);
524                let result = if p.is_dir() {
525                    std::fs::remove_dir_all(p)
526                } else {
527                    std::fs::remove_file(p)
528                };
529                match result {
530                    Ok(_) => Ok(ok(Value::Unit)),
531                    Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532                }
533            }
534            "copy" => {
535                let src = expect_str(args.first())?.to_string();
536                let dst = expect_str(args.get(1))?.to_string();
537                if let Err(e) = self.ensure_fs_walk_path(&src) {
538                    return Ok(err(Value::Str(e)));
539                }
540                if let Err(e) = self.ensure_fs_write_path(&dst) {
541                    return Ok(err(Value::Str(e)));
542                }
543                match std::fs::copy(&src, &dst) {
544                    Ok(_) => Ok(ok(Value::Unit)),
545                    Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546                }
547            }
548            other => Err(format!("unsupported fs.{other}")),
549        }
550    }
551
552    /// Path scope for walk-style operations. `[fs_walk]` reuses the
553    /// `--allow-fs-read` allowlist — listing a directory is an
554    /// information disclosure on the same path tree as reading file
555    /// content, so the same scope applies. Empty allowlist = any path.
556    fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557        if self.policy.allow_fs_read.is_empty() {
558            return Ok(());
559        }
560        let p = std::path::Path::new(path);
561        if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562            Ok(())
563        } else {
564            Err(format!("fs path `{path}` outside --allow-fs-read"))
565        }
566    }
567
568    /// Path scope for mutating operations. `[fs_write]` uses the
569    /// existing `--allow-fs-write` allowlist.
570    fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571        if self.policy.allow_fs_write.is_empty() {
572            return Ok(());
573        }
574        let p = std::path::Path::new(path);
575        if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576            Ok(())
577        } else {
578            Err(format!("fs path `{path}` outside --allow-fs-write"))
579        }
580    }
581
582    /// Enforce `--allow-net-host` against an outgoing URL. Empty
583    /// allowlist = any host. Non-empty = the URL's host must match
584    /// (substring; port-agnostic) at least one entry.
585    fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586        if self.policy.allow_net_host.is_empty() { return Ok(()); }
587        let host = extract_host(url).unwrap_or("");
588        if self.policy.allow_net_host.iter().any(|h| host == h) {
589            Ok(())
590        } else {
591            Err(format!(
592                "net call to host `{host}` not in --allow-net-host {:?}",
593                self.policy.allow_net_host,
594            ))
595        }
596    }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600    let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601    let host_port = match rest.find('/') {
602        Some(i) => &rest[..i],
603        None => rest,
604    };
605    Some(match host_port.rsplit_once(':') {
606        Some((h, _)) => h,
607        None => host_port,
608    })
609}
610
611impl EffectHandler for DefaultHandler {
612    /// Per-call budget enforcement (#225). VM calls this before
613    /// invoking any function whose signature declares `[budget(N)]`.
614    /// The cost N is deducted atomically from the shared pool;
615    /// returning `Err` aborts the call before any frame is pushed.
616    fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617        // Skip the work entirely when no ceiling is configured —
618        // the pool is `u64::MAX` and would never trip.
619        let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620        // Compare-and-swap: speculatively subtract; if we'd
621        // underflow, return BudgetExceeded without mutating.
622        // Use SeqCst because parallel branches may race here and
623        // the relative ordering of "used so far" vs. "this call's
624        // cost" needs to be deterministic across threads.
625        loop {
626            let cur = self.budget_remaining.load(Ordering::SeqCst);
627            if cost > cur {
628                let used = ceiling.saturating_sub(cur);
629                return Err(format!(
630                    "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631            }
632            let next = cur - cost;
633            // Conservative accounting: if the CAS races and loses,
634            // re-read and try again. No refund-on-failure path.
635            if self.budget_remaining.compare_exchange(cur, next,
636                Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637                return Ok(());
638            }
639        }
640    }
641
642    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643        // Pure stdlib builtins (str, list, json, ...) bypass the policy
644        // gate — they have no observable side effects and aren't tracked
645        // by the type system as effects.
646        if is_pure_call(kind, op) {
647            return call_pure_builtin(kind, op, args);
648        }
649        // `std.fs` ops use the fine-grained `[fs_walk]` and `[fs_write]`
650        // effect kinds (distinct from the module name `fs`); the
651        // policy check uses the per-op kind, not the module's.
652        if kind == "process" {
653            self.ensure_kind_allowed("proc")?;
654            return self.dispatch_process(op, args);
655        }
656        if kind == "log" {
657            // Emit ops are [log]; config ops are [io] (set_sink also
658            // [fs_write]). The dispatch picks the right kind per op.
659            let effect_kind = match op {
660                "debug" | "info" | "warn" | "error" => "log",
661                "set_level" | "set_format" => "io",
662                "set_sink" => {
663                    self.ensure_kind_allowed("io")?;
664                    self.ensure_kind_allowed("fs_write")?;
665                    return self.dispatch_log(op, args);
666                }
667                other => return Err(format!("unsupported log.{other}")),
668            };
669            self.ensure_kind_allowed(effect_kind)?;
670            return self.dispatch_log(op, args);
671        }
672        if kind == "fs" {
673            let effect_kind = match op {
674                "exists" | "is_file" | "is_dir" | "stat"
675                | "list_dir" | "walk" | "glob" => "fs_walk",
676                "mkdir_p" | "remove" => "fs_write",
677                "copy" => {
678                    self.ensure_kind_allowed("fs_walk")?;
679                    self.ensure_kind_allowed("fs_write")?;
680                    return self.dispatch_fs(op, args);
681                }
682                other => return Err(format!("unsupported fs.{other}")),
683            };
684            self.ensure_kind_allowed(effect_kind)?;
685            return self.dispatch_fs(op, args);
686        }
687        // `crypto.random` is the lone effectful op in `std.crypto`. Its
688        // declared effect kind is `random` (fine-grained on purpose so
689        // `lex audit --effect random` flags every token-generating
690        // call), distinct from the `crypto` module name.
691        // datetime.now is the only effectful op in std.datetime;
692        // declared kind is `time`, matching the existing `time.now`.
693        if kind == "datetime" && op == "now" {
694            self.ensure_kind_allowed("time")?;
695            // LEX_TEST_NOW (Unix seconds) pins the clock for deterministic tests (#350).
696            if let Ok(s) = std::env::var("LEX_TEST_NOW") {
697                if let Ok(secs) = s.trim().parse::<i64>() {
698                    return Ok(Value::Int(secs.saturating_mul(1_000_000_000)));
699                }
700            }
701            let now = chrono::Utc::now();
702            let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
703            return Ok(Value::Int(nanos));
704        }
705        if kind == "crypto" && op == "random" {
706            self.ensure_kind_allowed("random")?;
707            let n = expect_int(args.first())?;
708            if !(0..=1_048_576).contains(&n) {
709                return Err("crypto.random: n must be in 0..=1048576".into());
710            }
711            use rand::{rngs::SysRng, TryRng};
712            let mut buf = vec![0u8; n as usize];
713            SysRng.try_fill_bytes(&mut buf)
714                .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
715            return Ok(Value::Bytes(buf));
716        }
717        // crypto.random_str_hex(n) — N random bytes rendered as 2N
718        // lowercase hex chars (#382). The most common token-mint
719        // pattern (session ids, OAuth `state`, CSRF, request ids).
720        // Same `[random]` gate as `crypto.random`.
721        if kind == "crypto" && op == "random_str_hex" {
722            self.ensure_kind_allowed("random")?;
723            let n = expect_int(args.first())?;
724            if !(0..=1_048_576).contains(&n) {
725                return Err("crypto.random_str_hex: n must be in 0..=1048576".into());
726            }
727            use rand::{rngs::SysRng, TryRng};
728            let mut buf = vec![0u8; n as usize];
729            SysRng.try_fill_bytes(&mut buf)
730                .map_err(|e| format!("crypto.random_str_hex: OS RNG: {e}"))?;
731            return Ok(Value::Str(hex::encode(&buf)));
732        }
733        // `std.http` wire ops (send/get/post) gate on the `net`
734        // effect kind, not the module name. This matches the
735        // declared signature (`http.get :: Str -> [net] ...`) and
736        // keeps `--allow-effects net` doing the obvious thing for
737        // both `net.*` and `http.*` callers.
738        // `std.agent` (#184): the four runtime effects added for
739        // agent-style programs (`llm_local`, `llm_cloud`, `a2a`,
740        // `mcp`). The handlers are stubs — they enforce the
741        // declared-effect gate, return a sentinel `Ok` so traces
742        // record the call, and defer the real wire formats to
743        // downstream crates (`soft-agent` for `llm_*` and `a2a`)
744        // and #185 (MCP client wrapper).
745        if kind == "agent" {
746            let effect_kind = match op {
747                "local_complete" => "llm_local",
748                "cloud_complete" => "llm_cloud",
749                "cloud_stream"   => "llm_cloud",
750                "send_a2a"       => "a2a",
751                "call_mcp"       => "mcp",
752                other => return Err(format!("unsupported agent.{other}")),
753            };
754            self.ensure_kind_allowed(effect_kind)?;
755            // `call_mcp` runs through the LRU client cache
756            // (#197). `local_complete` / `cloud_complete` hit
757            // Ollama / OpenAI via env-var-driven configuration
758            // (#196); custom backends override at the
759            // EffectHandler layer rather than via a config file.
760            // `send_a2a` keeps its stub — that wire format
761            // lives in downstream `soft-a2a`.
762            return match op {
763                "call_mcp"       => Ok(self.dispatch_call_mcp(args)),
764                "local_complete" => Ok(dispatch_llm_local(args)),
765                "cloud_complete" => Ok(dispatch_llm_cloud(args)),
766                "cloud_stream"   => Ok(self.dispatch_cloud_stream(args)),
767                _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
768            };
769        }
770        if kind == "stream" {
771            // #305 slice 3: consumer-side stream operations. Each
772            // op resolves the opaque handle in the parent handler's
773            // stream registry and pulls one or all items. The
774            // `stream` effect must be granted by policy; default
775            // policies for agent runs grant it alongside the
776            // producer effect (e.g. `llm_cloud`).
777            self.ensure_kind_allowed("stream")?;
778            return match op {
779                "next"    => Ok(self.dispatch_stream_next(args)),
780                "collect" => Ok(self.dispatch_stream_collect(args)),
781                other => Err(format!("unsupported stream.{other}")),
782            };
783        }
784        if kind == "http" && matches!(op, "send" | "get" | "post") {
785            self.ensure_kind_allowed("net")?;
786            return match op {
787                "send" => {
788                    let req = expect_record(args.first())?;
789                    Ok(http_send_record(self, req))
790                }
791                "get" => {
792                    let url = expect_str(args.first())?.to_string();
793                    self.ensure_host_allowed(&url)?;
794                    Ok(http_send_simple("GET", &url, None, "", None))
795                }
796                "post" => {
797                    let url = expect_str(args.first())?.to_string();
798                    let body = expect_bytes(args.get(1))?.clone();
799                    let content_type = expect_str(args.get(2))?.to_string();
800                    self.ensure_host_allowed(&url)?;
801                    Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
802                }
803                _ => unreachable!(),
804            };
805        }
806        self.ensure_kind_allowed(kind)?;
807        match (kind, op) {
808            ("io", "print") => {
809                let line = expect_str(args.first())?;
810                self.sink.print_line(line);
811                Ok(Value::Unit)
812            }
813            ("io", "read") => {
814                let path = expect_str(args.first())?.to_string();
815                let resolved = self.resolve_read_path(&path);
816                // Honor read-allowlist if any. Symmetric with io.write.
817                // The path argument is checked as-given (resolved-against-
818                // read_root for tests); a tool granted [io] cannot escape
819                // the configured prefix even though the effect itself is
820                // permitted. This is the per-path scope the bench's case
821                // #6 ("[io] granted, body reads /etc/passwd") needed.
822                if !self.policy.allow_fs_read.is_empty()
823                    && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
824                {
825                    return Err(format!("read of `{path}` outside --allow-fs-read"));
826                }
827                match std::fs::read_to_string(&resolved) {
828                    Ok(s) => Ok(ok(Value::Str(s))),
829                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
830                }
831            }
832            ("io", "write") => {
833                let path = expect_str(args.first())?.to_string();
834                let contents = expect_str(args.get(1))?.to_string();
835                // Honor write-allowlist if any.
836                if !self.policy.allow_fs_write.is_empty() {
837                    let p = std::path::Path::new(&path);
838                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
839                        return Err(format!("write to `{path}` outside --allow-fs-write"));
840                    }
841                }
842                match std::fs::write(&path, contents) {
843                    Ok(_) => Ok(ok(Value::Unit)),
844                    Err(e) => Ok(err(Value::Str(format!("{e}")))),
845                }
846            }
847            ("time", "now") => {
848                // LEX_TEST_NOW (Unix seconds) pins for deterministic tests.
849                if let Ok(s) = std::env::var("LEX_TEST_NOW") {
850                    if let Ok(secs) = s.trim().parse::<i64>() {
851                        return Ok(Value::Int(secs));
852                    }
853                }
854                let secs = SystemTime::now().duration_since(UNIX_EPOCH)
855                    .map_err(|e| format!("time: {e}"))?.as_secs();
856                Ok(Value::Int(secs as i64))
857            }
858            ("time", "now_ms") => {
859                // Unix epoch in milliseconds (#378). `LEX_TEST_NOW` is
860                // documented in seconds, so we lift it to ms by *1000
861                // to keep the pinning story uniform across `time.now`
862                // and `time.now_ms`.
863                if let Ok(s) = std::env::var("LEX_TEST_NOW") {
864                    if let Ok(secs) = s.trim().parse::<i64>() {
865                        return Ok(Value::Int(secs.saturating_mul(1000)));
866                    }
867                }
868                let ms = SystemTime::now().duration_since(UNIX_EPOCH)
869                    .map_err(|e| format!("time: {e}"))?.as_millis();
870                Ok(Value::Int(ms as i64))
871            }
872            ("time", "now_str") => {
873                // ISO-8601 / RFC 3339 in UTC (#378). Format mirrors
874                // `chrono::Utc::now().to_rfc3339()` already used
875                // elsewhere in the handler.
876                if let Ok(s) = std::env::var("LEX_TEST_NOW") {
877                    if let Ok(secs) = s.trim().parse::<i64>() {
878                        let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0)
879                            .unwrap_or_else(chrono::Utc::now);
880                        return Ok(Value::Str(dt.to_rfc3339()));
881                    }
882                }
883                Ok(Value::Str(chrono::Utc::now().to_rfc3339()))
884            }
885            ("time", "mono_ns") => {
886                // Monotonic clock relative to process start. Cached
887                // `Instant::now()` anchor so successive `mono_ns`
888                // calls return strictly non-decreasing values without
889                // depending on the wall clock. Not affected by
890                // `LEX_TEST_NOW` — pinning a monotonic clock would
891                // defeat its purpose; tests needing a fake monotonic
892                // clock should swap in their own `EffectHandler`.
893                static MONO_START: OnceLock<std::time::Instant> = OnceLock::new();
894                let start = MONO_START.get_or_init(std::time::Instant::now);
895                let dur = std::time::Instant::now().duration_since(*start);
896                Ok(Value::Int(dur.as_nanos() as i64))
897            }
898            ("time", "sleep_ms") => {
899                // Block the current thread for `n` ms (#226). Used
900                // by `flow.retry_with_backoff`'s exponential delay.
901                // Negative or zero is a no-op. Bounded at 60s in the
902                // runtime to avoid pathological agent-emitted loops
903                // wedging the host — anything legitimate beyond
904                // that should use process-level scheduling, not a
905                // blocking sleep.
906                let n = expect_int(args.first())?;
907                if n > 0 {
908                    let ms = (n as u64).min(60_000);
909                    std::thread::sleep(std::time::Duration::from_millis(ms));
910                }
911                Ok(Value::Unit)
912            }
913            ("rand", "int_in") => {
914                // Deterministic stub: midpoint of [lo, hi].
915                let lo = expect_int(args.first())?;
916                let hi = expect_int(args.get(1))?;
917                Ok(Value::Int((lo + hi) / 2))
918            }
919            // `env.get` returns `Option[Str]` — `None` for unset vars.
920            // Per-var scoping (`[env(NAME)]`) arrives with #207's
921            // per-capability effect parameterization; today the flat
922            // `[env]` grants access to the entire process environment.
923            ("env", "get") => {
924                let name = expect_str(args.first())?;
925                Ok(match std::env::var(name) {
926                    Ok(v) => Value::Variant {
927                        name: "Some".into(),
928                        args: vec![Value::Str(v)],
929                    },
930                    Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
931                })
932            }
933            ("budget", _) => {
934                // Budget calls are nominally tracked here; budget itself is
935                // enforced statically in `policy::check_program`.
936                Ok(Value::Unit)
937            }
938            ("net", "get") => {
939                let url = expect_str(args.first())?.to_string();
940                self.ensure_host_allowed(&url)?;
941                Ok(http_request("GET", &url, None))
942            }
943            ("net", "post") => {
944                let url = expect_str(args.first())?.to_string();
945                let body = expect_str(args.get(1))?.to_string();
946                self.ensure_host_allowed(&url)?;
947                Ok(http_request("POST", &url, Some(&body)))
948            }
949            ("net", "serve") => {
950                let port = match args.first() {
951                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
952                    _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
953                };
954                let handler_name = expect_str(args.get(1))?.to_string();
955                let program = self.program.clone()
956                    .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
957                let policy = self.policy.clone();
958                serve_http(port, handler_name, program, policy, None)
959            }
960            ("net", "serve_fn") => {
961                let port = match args.first() {
962                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
963                    _ => return Err("net.serve_fn(port, handler): port must be Int 0..=65535".into()),
964                };
965                let closure = match args.into_iter().nth(1) {
966                    Some(c @ Value::Closure { .. }) => c,
967                    _ => return Err("net.serve_fn(port, handler): handler must be a closure".into()),
968                };
969                let program = self.program.clone()
970                    .ok_or_else(|| "net.serve_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
971                let policy = self.policy.clone();
972                serve_http_fn(port, closure, program, policy)
973            }
974            ("net", "serve_tls") => {
975                let port = match args.first() {
976                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
977                    _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
978                };
979                let cert_path = expect_str(args.get(1))?.to_string();
980                let key_path = expect_str(args.get(2))?.to_string();
981                let handler_name = expect_str(args.get(3))?.to_string();
982                let program = self.program.clone()
983                    .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
984                let policy = self.policy.clone();
985                let cert = std::fs::read(&cert_path)
986                    .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
987                let key = std::fs::read(&key_path)
988                    .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
989                serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
990            }
991            ("net", "serve_ws") => {
992                let port = match args.first() {
993                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
994                    _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
995                };
996                let handler_name = expect_str(args.get(1))?.to_string();
997                let program = self.program.clone()
998                    .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
999                let policy = self.policy.clone();
1000                let registry = Arc::new(crate::ws::ChatRegistry::default());
1001                crate::ws::serve_ws(port, handler_name, program, policy, registry)
1002            }
1003            ("net", "serve_ws_fn") => {
1004                let port = match args.first() {
1005                    Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
1006                    _ => return Err("net.serve_ws_fn(port, subprotocol, handler): port must be Int 0..=65535".into()),
1007                };
1008                let subprotocol = expect_str(args.get(1))?.to_string();
1009                let closure = match args.into_iter().nth(2) {
1010                    Some(c @ Value::Closure { .. }) => c,
1011                    _ => return Err("net.serve_ws_fn(port, subprotocol, handler): handler must be a closure".into()),
1012                };
1013                let program = self.program.clone()
1014                    .ok_or_else(|| "net.serve_ws_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
1015                let policy = self.policy.clone();
1016                let registry = Arc::new(crate::ws::ChatRegistry::default());
1017                crate::ws::serve_ws_fn(port, subprotocol, closure, program, policy, registry)
1018            }
1019            ("net", "dial_ws") => {
1020                // dial_ws(url, subprotocol, on_open, on_message)
1021                let url = expect_str(args.first())?.to_string();
1022                let subprotocol = expect_str(args.get(1))?.to_string();
1023                let on_open = match args.get(2).cloned() {
1024                    Some(c @ Value::Closure { .. }) => c,
1025                    _ => return Err(
1026                        "net.dial_ws(url, subprotocol, on_open, on_message): on_open must be a closure".into(),
1027                    ),
1028                };
1029                let on_message = match args.into_iter().nth(3) {
1030                    Some(c @ Value::Closure { .. }) => c,
1031                    _ => return Err(
1032                        "net.dial_ws(url, subprotocol, on_open, on_message): on_message must be a closure".into(),
1033                    ),
1034                };
1035                let program = self.program.clone().ok_or_else(|| {
1036                    "net.dial_ws requires a Program reference; use DefaultHandler::with_program".to_string()
1037                })?;
1038                let policy = self.policy.clone();
1039                crate::ws::dial_ws(url, subprotocol, on_open, on_message, program, policy)
1040            }
1041            ("chat", "broadcast") => {
1042                let registry = self.chat_registry.as_ref()
1043                    .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
1044                let room = expect_str(args.first())?;
1045                let body = expect_str(args.get(1))?;
1046                crate::ws::chat_broadcast(registry, room, body);
1047                Ok(Value::Unit)
1048            }
1049            ("chat", "send") => {
1050                let registry = self.chat_registry.as_ref()
1051                    .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
1052                let conn_id = match args.first() {
1053                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
1054                    _ => return Err("chat.send: conn_id must be non-negative Int".into()),
1055                };
1056                let body = expect_str(args.get(1))?;
1057                Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
1058            }
1059            ("kv", "open") => {
1060                let path = expect_str(args.first())?.to_string();
1061                // Honor write-allowlist: opening a Kv writes its
1062                // backing files at `path`, so the same scoping that
1063                // applies to `io.write` applies here.
1064                if !self.policy.allow_fs_write.is_empty() {
1065                    let p = std::path::Path::new(&path);
1066                    if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1067                        return Ok(err(Value::Str(format!(
1068                            "kv.open: `{path}` outside --allow-fs-write"))));
1069                    }
1070                }
1071                match sled::open(&path) {
1072                    Ok(db) => {
1073                        let handle = next_kv_handle();
1074                        kv_registry().lock().unwrap().insert(handle, db);
1075                        Ok(ok(Value::Int(handle as i64)))
1076                    }
1077                    Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
1078                }
1079            }
1080            ("kv", "close") => {
1081                let h = expect_kv_handle(args.first())?;
1082                kv_registry().lock().unwrap().remove(h);
1083                Ok(Value::Unit)
1084            }
1085            ("kv", "get") => {
1086                let h = expect_kv_handle(args.first())?;
1087                let key = expect_str(args.get(1))?;
1088                let mut reg = kv_registry().lock().unwrap();
1089                let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
1090                match db.get(key.as_bytes()) {
1091                    Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
1092                    Ok(None) => Ok(none()),
1093                    Err(e) => Err(format!("kv.get: {e}")),
1094                }
1095            }
1096            ("kv", "put") => {
1097                let h = expect_kv_handle(args.first())?;
1098                let key = expect_str(args.get(1))?.to_string();
1099                let val = expect_bytes(args.get(2))?.clone();
1100                let mut reg = kv_registry().lock().unwrap();
1101                let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
1102                match db.insert(key.as_bytes(), val) {
1103                    Ok(_) => Ok(ok(Value::Unit)),
1104                    Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
1105                }
1106            }
1107            ("kv", "delete") => {
1108                let h = expect_kv_handle(args.first())?;
1109                let key = expect_str(args.get(1))?;
1110                let mut reg = kv_registry().lock().unwrap();
1111                let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
1112                match db.remove(key.as_bytes()) {
1113                    Ok(_) => Ok(ok(Value::Unit)),
1114                    Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
1115                }
1116            }
1117            ("kv", "contains") => {
1118                let h = expect_kv_handle(args.first())?;
1119                let key = expect_str(args.get(1))?;
1120                let mut reg = kv_registry().lock().unwrap();
1121                let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1122                match db.contains_key(key.as_bytes()) {
1123                    Ok(present) => Ok(Value::Bool(present)),
1124                    Err(e) => Err(format!("kv.contains: {e}")),
1125                }
1126            }
1127            ("kv", "list_prefix") => {
1128                let h = expect_kv_handle(args.first())?;
1129                let prefix = expect_str(args.get(1))?;
1130                let mut reg = kv_registry().lock().unwrap();
1131                let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1132                let mut keys: Vec<Value> = Vec::new();
1133                for kv in db.scan_prefix(prefix.as_bytes()) {
1134                    let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1135                    let s = String::from_utf8_lossy(&k).to_string();
1136                    keys.push(Value::Str(s));
1137                }
1138                Ok(Value::List(keys))
1139            }
1140            ("sql", "open") => {
1141                let path = expect_str(args.first())?.to_string();
1142                if path.starts_with("postgres://") || path.starts_with("postgresql://") {
1143                    // Postgres: connect via sync driver; no fs-write policy applies.
1144                    match postgres::Client::connect(&path, postgres::NoTls) {
1145                        Ok(client) => {
1146                            let handle = next_sql_handle();
1147                            sql_registry().lock().unwrap().insert(handle, SqlConn::Postgres(client));
1148                            Ok(ok(Value::Int(handle as i64)))
1149                        }
1150                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.open"))),
1151                    }
1152                } else {
1153                    // SQLite: same shape as `kv.open`; fs-write allowlist applies
1154                    // (in-memory paths are exempt).
1155                    if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1156                        let p = std::path::Path::new(&path);
1157                        if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1158                            return Ok(err(sql_error(
1159                                format!("sql.open: `{path}` outside --allow-fs-write"),
1160                                None, None,
1161                            )));
1162                        }
1163                    }
1164                    match rusqlite::Connection::open(&path) {
1165                        Ok(conn) => {
1166                            let handle = next_sql_handle();
1167                            sql_registry().lock().unwrap().insert(handle, SqlConn::Sqlite(conn));
1168                            Ok(ok(Value::Int(handle as i64)))
1169                        }
1170                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.open"))),
1171                    }
1172                }
1173            }
1174            ("sql", "close") => {
1175                let h = expect_sql_handle(args.first())?;
1176                sql_registry().lock().unwrap().remove(h);
1177                Ok(Value::Unit)
1178            }
1179            ("sql", "exec") => {
1180                let h = expect_sql_handle(args.first())?;
1181                let stmt = expect_str(args.get(1))?.to_string();
1182                let params = expect_sql_params(args.get(2))?;
1183                let arc = sql_registry().lock().unwrap()
1184                    .touch_get(h)
1185                    .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1186                let mut conn = arc.lock().unwrap();
1187                match &mut *conn {
1188                    SqlConn::Sqlite(c) => {
1189                        let bound = sqlite_params(&params);
1190                        let bind: Vec<&dyn rusqlite::ToSql> =
1191                            bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1192                        match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1193                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1194                            Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec"))),
1195                        }
1196                    }
1197                    SqlConn::Postgres(c) => {
1198                        let pg = pg_param_refs(&params);
1199                        let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1200                            pg.iter().map(|b| b.as_ref()).collect();
1201                        match c.execute(stmt.as_str(), &refs) {
1202                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1203                            Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec"))),
1204                        }
1205                    }
1206                }
1207            }
1208            ("sql", "query") => {
1209                let h = expect_sql_handle(args.first())?;
1210                let stmt_str = expect_str(args.get(1))?.to_string();
1211                let params = expect_sql_params(args.get(2))?;
1212                let arc = sql_registry().lock().unwrap()
1213                    .touch_get(h)
1214                    .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1215                let mut conn = arc.lock().unwrap();
1216                Ok(match &mut *conn {
1217                    SqlConn::Sqlite(c)   => sql_run_query_sqlite(c, &stmt_str, &params),
1218                    SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, &params),
1219                })
1220            }
1221            // Streaming cursor (#379). Allocates an mpsc-backed cursor
1222            // handle, spawns a producer thread to ship rows one at a
1223            // time, and returns `__IterCursor(handle)` wrapped in `Ok`.
1224            // `iter.next` bytecode dispatches the variant tag and
1225            // effect-calls `sql.cursor_next` (below) to advance.
1226            ("sql", "query_iter") => {
1227                let h = expect_sql_handle(args.first())?;
1228                let stmt_str = expect_str(args.get(1))?.to_string();
1229                let params = expect_sql_params(args.get(2))?;
1230                let arc = sql_registry().lock().unwrap()
1231                    .touch_get(h)
1232                    .ok_or_else(|| "sql.query_iter: closed or unknown Db handle".to_string())?;
1233
1234                // Dispatch producer on the connection kind without
1235                // holding the SqlRegistry lock — the producer thread
1236                // owns its own clone of the connection Arc.
1237                let (sender, receiver) = std::sync::mpsc::sync_channel::<Result<Value, String>>(
1238                    CURSOR_CHANNEL_CAPACITY,
1239                );
1240                let cursor_h = next_cursor_handle();
1241                cursor_registry().lock().unwrap().insert(cursor_h, receiver);
1242
1243                let arc_for_thread = Arc::clone(&arc);
1244                // Decide which producer to spawn based on the
1245                // connection's variant. We can briefly peek at the
1246                // variant here without holding the lock for the
1247                // producer's lifetime — the producer locks again
1248                // inside its thread function.
1249                let is_sqlite = matches!(*arc.lock().unwrap(), SqlConn::Sqlite(_));
1250                std::thread::spawn(move || {
1251                    if is_sqlite {
1252                        sqlite_cursor_producer(arc_for_thread, stmt_str, params, sender);
1253                    } else {
1254                        pg_cursor_producer(arc_for_thread, stmt_str, params, sender);
1255                    }
1256                });
1257
1258                Ok(ok(Value::Variant {
1259                    name: "__IterCursor".into(),
1260                    args: vec![Value::Int(cursor_h as i64)],
1261                }))
1262            }
1263            // Pull one row from the producer; called from
1264            // `iter.next`'s `__IterCursor` dispatch branch. Returns
1265            // a Lex `Option[Row]`: `Some(row)` while the producer
1266            // has more, `None` once the channel closes (producer
1267            // done, errored, or cursor evicted from the registry).
1268            ("sql", "cursor_next") => {
1269                let h = match args.first() {
1270                    Some(Value::Int(n)) if *n >= 0 => *n as u64,
1271                    _ => return Err("sql.cursor_next: expected cursor handle (Int)".into()),
1272                };
1273                let rx_arc = match cursor_registry().lock().unwrap().touch_get(h) {
1274                    Some(a) => a,
1275                    None => return Ok(Value::Variant { name: "None".into(), args: vec![] }),
1276                };
1277                // Lock the receiver itself (separate from the global
1278                // registry lock) and block on `recv()`. The producer
1279                // is on a different thread, so this can sleep without
1280                // contention beyond the per-cursor mutex.
1281                let recv_result = {
1282                    let rx = match rx_arc.lock() {
1283                        Ok(g) => g,
1284                        Err(p) => p.into_inner(),
1285                    };
1286                    rx.recv()
1287                };
1288                match recv_result {
1289                    Ok(Ok(row)) => Ok(Value::Variant {
1290                        name: "Some".into(),
1291                        args: vec![row],
1292                    }),
1293                    Ok(Err(_)) | Err(_) => {
1294                        // Channel closed (producer done) or row error
1295                        // — drop the registry entry and signal None
1296                        // so callers stop polling.
1297                        cursor_registry().lock().unwrap().remove(h);
1298                        Ok(Value::Variant { name: "None".into(), args: vec![] })
1299                    }
1300                }
1301            }
1302            // Transactions: begin issues BEGIN SQL on the connection;
1303            // commit/rollback issue COMMIT/ROLLBACK. SqlTx reuses the
1304            // same Int handle as Db — the type system enforces correct
1305            // usage; the runtime treats both as the same registry key.
1306            ("sql", "begin") => {
1307                let h = expect_sql_handle(args.first())?;
1308                let arc = sql_registry().lock().unwrap()
1309                    .touch_get(h)
1310                    .ok_or_else(|| "sql.begin: closed or unknown Db handle".to_string())?;
1311                let mut conn = arc.lock().unwrap();
1312                match &mut *conn {
1313                    SqlConn::Sqlite(c) => match c.execute_batch("BEGIN") {
1314                        Ok(()) => Ok(ok(Value::Int(h as i64))),
1315                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.begin"))),
1316                    },
1317                    SqlConn::Postgres(c) => match c.batch_execute("BEGIN") {
1318                        Ok(()) => Ok(ok(Value::Int(h as i64))),
1319                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.begin"))),
1320                    },
1321                }
1322            }
1323            ("sql", "commit") => {
1324                let h = expect_sql_handle(args.first())?;
1325                let arc = sql_registry().lock().unwrap()
1326                    .touch_get(h)
1327                    .ok_or_else(|| "sql.commit: closed or unknown SqlTx handle".to_string())?;
1328                let mut conn = arc.lock().unwrap();
1329                match &mut *conn {
1330                    SqlConn::Sqlite(c) => match c.execute_batch("COMMIT") {
1331                        Ok(()) => Ok(ok(Value::Unit)),
1332                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.commit"))),
1333                    },
1334                    SqlConn::Postgres(c) => match c.batch_execute("COMMIT") {
1335                        Ok(()) => Ok(ok(Value::Unit)),
1336                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.commit"))),
1337                    },
1338                }
1339            }
1340            ("sql", "rollback") => {
1341                let h = expect_sql_handle(args.first())?;
1342                let arc = sql_registry().lock().unwrap()
1343                    .touch_get(h)
1344                    .ok_or_else(|| "sql.rollback: closed or unknown SqlTx handle".to_string())?;
1345                let mut conn = arc.lock().unwrap();
1346                match &mut *conn {
1347                    SqlConn::Sqlite(c) => match c.execute_batch("ROLLBACK") {
1348                        Ok(()) => Ok(ok(Value::Unit)),
1349                        Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.rollback"))),
1350                    },
1351                    SqlConn::Postgres(c) => match c.batch_execute("ROLLBACK") {
1352                        Ok(()) => Ok(ok(Value::Unit)),
1353                        Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.rollback"))),
1354                    },
1355                }
1356            }
1357            ("sql", "exec_tx") => {
1358                let h = expect_sql_handle(args.first())?;
1359                let stmt = expect_str(args.get(1))?.to_string();
1360                let params = expect_sql_params(args.get(2))?;
1361                let arc = sql_registry().lock().unwrap()
1362                    .touch_get(h)
1363                    .ok_or_else(|| "sql.exec_tx: closed or unknown SqlTx handle".to_string())?;
1364                let mut conn = arc.lock().unwrap();
1365                match &mut *conn {
1366                    SqlConn::Sqlite(c) => {
1367                        let bound = sqlite_params(&params);
1368                        let bind: Vec<&dyn rusqlite::ToSql> =
1369                            bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1370                        match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1371                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1372                            Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec_tx"))),
1373                        }
1374                    }
1375                    SqlConn::Postgres(c) => {
1376                        let pg = pg_param_refs(&params);
1377                        let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1378                            pg.iter().map(|b| b.as_ref()).collect();
1379                        match c.execute(stmt.as_str(), &refs) {
1380                            Ok(n)  => Ok(ok(Value::Int(n as i64))),
1381                            Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec_tx"))),
1382                        }
1383                    }
1384                }
1385            }
1386            ("sql", "query_tx") => {
1387                let h = expect_sql_handle(args.first())?;
1388                let stmt_str = expect_str(args.get(1))?.to_string();
1389                let params = expect_sql_params(args.get(2))?;
1390                let arc = sql_registry().lock().unwrap()
1391                    .touch_get(h)
1392                    .ok_or_else(|| "sql.query_tx: closed or unknown SqlTx handle".to_string())?;
1393                let mut conn = arc.lock().unwrap();
1394                Ok(match &mut *conn {
1395                    SqlConn::Sqlite(c)   => sql_run_query_sqlite(c, &stmt_str, &params),
1396                    SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, &params),
1397                })
1398            }
1399            ("sql", "get_str") => Ok(sql_get_col(&args, |v| match v {
1400                Value::Str(s) => Some(Value::Str(s.clone())),
1401                Value::Int(n) => Some(Value::Str(n.to_string())),
1402                _ => None,
1403            })?),
1404            ("sql", "get_int") => Ok(sql_get_col(&args, |v| match v {
1405                Value::Int(n) => Some(Value::Int(*n)),
1406                Value::Float(f) => Some(Value::Int(*f as i64)),
1407                _ => None,
1408            })?),
1409            ("sql", "get_float") => Ok(sql_get_col(&args, |v| match v {
1410                Value::Float(f) => Some(Value::Float(*f)),
1411                Value::Int(n)   => Some(Value::Float(*n as f64)),
1412                _ => None,
1413            })?),
1414            ("sql", "get_bool") => Ok(sql_get_col(&args, |v| match v {
1415                Value::Bool(b)  => Some(Value::Bool(*b)),
1416                Value::Int(n)   => Some(Value::Bool(*n != 0)),
1417                _ => None,
1418            })?),
1419            ("proc", "spawn") => {
1420                // The escape hatch effect. Spawns a child process,
1421                // collects its stdout/stderr, returns a structured
1422                // record. Allow-list is the binary basename: anything
1423                // outside `--allow-proc` is rejected pre-spawn.
1424                //
1425                // What this does NOT validate (per SECURITY.md):
1426                // - per-arg content (a script-like CLI invoked via
1427                //   --eval=... can run anything)
1428                // - environment variables (inherited from the parent)
1429                // - working directory (the parent's)
1430                //
1431                // For untrusted input, layer with OS-level
1432                // sandboxing — gVisor / nsjail / a container.
1433                let cmd = expect_str(args.first())?.to_string();
1434                let raw_args = match args.get(1) {
1435                    Some(Value::List(items)) => items,
1436                    Some(other) => return Err(format!(
1437                        "proc.spawn: args must be List[Str], got {other:?}")),
1438                    None => return Err("proc.spawn: missing args list".into()),
1439                };
1440                let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1441                    Value::Str(s) => Ok(s.clone()),
1442                    other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1443                }).collect::<Result<Vec<_>, _>>()?;
1444
1445                // Allow-list check: empty list = any binary (escape
1446                // hatch); non-empty = basename of cmd must match an
1447                // entry exactly.
1448                if !self.policy.allow_proc.is_empty() {
1449                    let basename = std::path::Path::new(&cmd)
1450                        .file_name()
1451                        .and_then(|s| s.to_str())
1452                        .unwrap_or(&cmd);
1453                    if !self.policy.allow_proc.iter().any(|a| a == basename) {
1454                        return Ok(err(Value::Str(format!(
1455                            "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1456                            self.policy.allow_proc
1457                        ))));
1458                    }
1459                }
1460
1461                // Hard caps: the spec doesn't pin numbers, but
1462                // unbounded argv is a DoS vector.
1463                if str_args.len() > 1024 {
1464                    return Ok(err(Value::Str(
1465                        "proc.spawn: arg-count exceeds 1024".into())));
1466                }
1467                if str_args.iter().any(|a| a.len() > 65_536) {
1468                    return Ok(err(Value::Str(
1469                        "proc.spawn: per-arg length exceeds 64 KiB".into())));
1470                }
1471
1472                let output = std::process::Command::new(&cmd)
1473                    .args(&str_args)
1474                    .output();
1475                match output {
1476                    Ok(o) => {
1477                        let mut rec = indexmap::IndexMap::new();
1478                        rec.insert("stdout".into(), Value::Str(
1479                            String::from_utf8_lossy(&o.stdout).to_string()));
1480                        rec.insert("stderr".into(), Value::Str(
1481                            String::from_utf8_lossy(&o.stderr).to_string()));
1482                        rec.insert("exit_code".into(), Value::Int(
1483                            o.status.code().unwrap_or(-1) as i64));
1484                        Ok(ok(Value::Record(rec)))
1485                    }
1486                    Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1487                }
1488            }
1489            other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1490        }
1491    }
1492
1493    /// `list.par_map` worker-handler factory (#305 slice 2).
1494    ///
1495    /// Builds a fresh `DefaultHandler` per worker that shares the
1496    /// budget pool with the parent (`Arc<AtomicU64>`) so a parallel
1497    /// batch can't escape the run-wide budget ceiling. Other state
1498    /// is intentionally split per-worker:
1499    ///
1500    /// - `sink`: a `StdoutSink` per worker. Tests that capture
1501    ///   output via a `SharedSink` wrapped in `Arc<Mutex<…>>` see
1502    ///   each worker as a fresh handler. Print interleaving on
1503    ///   stdout is acceptable; tests that need ordered capture run
1504    ///   workloads serially anyway.
1505    /// - `mcp_clients`: a fresh per-worker LRU cache. The parent's
1506    ///   subprocess handles can't be shared across threads without
1507    ///   mutex-serialising every MCP call, which would defeat the
1508    ///   parallelism. Cache hit rate is sub-optimal across the
1509    ///   first call per worker; warmed caches still amortise within
1510    ///   a worker.
1511    /// - `chat_registry`: cloned `Arc<ChatRegistry>` so all workers
1512    ///   route into the same chat dispatch layer.
1513    /// - `program`: cloned `Arc<Program>` so `net.serve` (if a
1514    ///   worker invokes it) sees the same compiled program.
1515    fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1516        let mut fresh = DefaultHandler::new(self.policy.clone());
1517        // Share the budget pool atomically — slice 2's correctness
1518        // contract: parallel work counts against the same ceiling.
1519        fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1520        fresh.budget_ceiling = self.budget_ceiling;
1521        fresh.read_root = self.read_root.clone();
1522        fresh.program = self.program.clone();
1523        fresh.chat_registry = self.chat_registry.clone();
1524        // #305 slice 3: share the stream registry across workers so
1525        // a stream produced on one thread (or the parent) is
1526        // consumable on any other. The registry is already
1527        // `Arc<Mutex<…>>` so concurrent access is safe.
1528        fresh.streams = std::sync::Arc::clone(&self.streams);
1529        fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1530        Some(Box::new(fresh))
1531    }
1532}
1533
1534/// Blocks the calling thread, accepts incoming HTTP requests on
1535/// `127.0.0.1:port`, and dispatches each through the named Lex
1536/// stage. Each request gets a fresh `Vm`; the program and policy
1537/// are shared.
1538///
1539/// Handler signature in Lex (by convention):
1540///   fn <name>(req :: Record { method :: Str, path :: Str, body :: Str })
1541///        -> Record { status :: Int, body :: Str }
1542/// PEM-encoded certificate + private key, both as raw bytes.
1543pub struct TlsConfig {
1544    pub cert: Vec<u8>,
1545    pub key: Vec<u8>,
1546}
1547
1548fn serve_http(
1549    port: u16,
1550    handler_name: String,
1551    program: Arc<Program>,
1552    policy: Policy,
1553    tls: Option<TlsConfig>,
1554) -> Result<Value, String> {
1555    match tls {
1556        None => serve_http_plain(port, handler_name, program, policy),
1557        Some(cfg) => serve_http_tls_legacy(port, handler_name, program, policy, cfg),
1558    }
1559}
1560
1561/// Hyper 1.x + Tokio multi-thread HTTP/1.1 server for `net.serve`.
1562/// Each connection is accepted in an async task; the synchronous Lex VM
1563/// call runs inside `spawn_blocking` so it doesn't block the executor.
1564fn serve_http_plain(
1565    port: u16,
1566    handler_name: String,
1567    program: Arc<Program>,
1568    policy: Policy,
1569) -> Result<Value, String> {
1570    use http_body_util::BodyExt as _;
1571    use hyper::server::conn::http1;
1572    use hyper::service::service_fn;
1573    use hyper_util::rt::TokioIo;
1574    use tokio::net::TcpListener as TokioTcpListener;
1575
1576    let rt = tokio::runtime::Builder::new_multi_thread()
1577        .enable_all()
1578        .build()
1579        .map_err(|e| format!("net.serve: tokio runtime: {e}"))?;
1580    rt.block_on(async move {
1581        let listener = TokioTcpListener::bind(("0.0.0.0", port))
1582            .await
1583            .map_err(|e| format!("net.serve bind {port}: {e}"))?;
1584        eprintln!("net.serve: listening on http://0.0.0.0:{port}");
1585        loop {
1586            let (stream, _) = listener
1587                .accept()
1588                .await
1589                .map_err(|e| format!("net.serve accept: {e}"))?;
1590            let io = TokioIo::new(stream);
1591            let program = Arc::clone(&program);
1592            let policy = policy.clone();
1593            let handler_name = handler_name.clone();
1594            tokio::spawn(async move {
1595                let program2 = Arc::clone(&program);
1596                let policy2 = policy.clone();
1597                let handler_name2 = handler_name.clone();
1598                let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1599                    let program = Arc::clone(&program2);
1600                    let policy = policy2.clone();
1601                    let handler_name = handler_name2.clone();
1602                    async move {
1603                        let (parts, body) = req.into_parts();
1604                        let body_bytes = body
1605                            .collect()
1606                            .await
1607                            .map(|c| c.to_bytes())
1608                            .unwrap_or_default();
1609                        let result = tokio::task::spawn_blocking(move || {
1610                            let lex_req = build_request_value_parts(&parts, &body_bytes);
1611                            let handler = DefaultHandler::new(policy)
1612                                .with_program(Arc::clone(&program));
1613                            let mut vm = Vm::with_handler(&program, Box::new(handler));
1614                            vm.call(&handler_name, vec![lex_req])
1615                        })
1616                        .await;
1617                        Ok::<_, std::convert::Infallible>(match result {
1618                            Ok(Ok(resp)) => build_hyper_response(&resp),
1619                            Ok(Err(e)) => error_response(500, &format!("internal error: {e}")),
1620                            Err(e) => error_response(500, &format!("task panicked: {e}")),
1621                        })
1622                    }
1623                });
1624                if let Err(e) = http1::Builder::new().serve_connection(io, svc).await {
1625                    eprintln!("net.serve: connection error: {e}");
1626                }
1627            });
1628        }
1629    })
1630}
1631
1632/// TLS path: still uses tiny_http pending a tokio-rustls migration.
1633fn serve_http_tls_legacy(
1634    port: u16,
1635    handler_name: String,
1636    program: Arc<Program>,
1637    policy: Policy,
1638    cfg: TlsConfig,
1639) -> Result<Value, String> {
1640    let ssl = tiny_http::SslConfig {
1641        certificate: cfg.cert,
1642        private_key: cfg.key,
1643    };
1644    let server = tiny_http::Server::https(("0.0.0.0", port), ssl)
1645        .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?;
1646    eprintln!("net.serve: listening on https://0.0.0.0:{port}");
1647    for req in server.incoming_requests() {
1648        let program = Arc::clone(&program);
1649        let policy = policy.clone();
1650        let handler_name = handler_name.clone();
1651        std::thread::spawn(move || handle_request_tls(req, program, policy, handler_name));
1652    }
1653    Ok(Value::Unit)
1654}
1655
1656fn handle_request_tls(
1657    mut req: tiny_http::Request,
1658    program: Arc<Program>,
1659    policy: Policy,
1660    handler_name: String,
1661) {
1662    let lex_req = build_request_value_tiny(&mut req);
1663    let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1664    let mut vm = Vm::with_handler(&program, Box::new(handler));
1665    match vm.call(&handler_name, vec![lex_req]) {
1666        Ok(resp) => {
1667            let (status, body, headers) = unpack_response(&resp);
1668            respond_with_body_tls(req, status, body, headers);
1669        }
1670        Err(e) => {
1671            let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1672                .with_status_code(500);
1673            let _ = req.respond(response);
1674        }
1675    }
1676}
1677
1678/// Hyper 1.x + Tokio multi-thread HTTP/1.1 server for `net.serve_fn`.
1679fn serve_http_fn(
1680    port: u16,
1681    closure: Value,
1682    program: Arc<Program>,
1683    policy: Policy,
1684) -> Result<Value, String> {
1685    use http_body_util::BodyExt as _;
1686    use hyper::server::conn::http1;
1687    use hyper::service::service_fn;
1688    use hyper_util::rt::TokioIo;
1689    use tokio::net::TcpListener as TokioTcpListener;
1690
1691    let rt = tokio::runtime::Builder::new_multi_thread()
1692        .enable_all()
1693        .build()
1694        .map_err(|e| format!("net.serve_fn: tokio runtime: {e}"))?;
1695    rt.block_on(async move {
1696        let listener = TokioTcpListener::bind(("0.0.0.0", port))
1697            .await
1698            .map_err(|e| format!("net.serve_fn bind {port}: {e}"))?;
1699        eprintln!("net.serve_fn: listening on http://0.0.0.0:{port}");
1700        loop {
1701            let (stream, _) = listener
1702                .accept()
1703                .await
1704                .map_err(|e| format!("net.serve_fn accept: {e}"))?;
1705            let io = TokioIo::new(stream);
1706            let program = Arc::clone(&program);
1707            let policy = policy.clone();
1708            let closure = closure.clone();
1709            tokio::spawn(async move {
1710                let program2 = Arc::clone(&program);
1711                let policy2 = policy.clone();
1712                let closure2 = closure.clone();
1713                let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1714                    let program = Arc::clone(&program2);
1715                    let policy = policy2.clone();
1716                    let closure = closure2.clone();
1717                    async move {
1718                        let (parts, body) = req.into_parts();
1719                        let body_bytes = body
1720                            .collect()
1721                            .await
1722                            .map(|c| c.to_bytes())
1723                            .unwrap_or_default();
1724                        let result = tokio::task::spawn_blocking(move || {
1725                            let lex_req = build_request_value_parts(&parts, &body_bytes);
1726                            let handler = DefaultHandler::new(policy)
1727                                .with_program(Arc::clone(&program));
1728                            let mut vm = Vm::with_handler(&program, Box::new(handler));
1729                            vm.invoke_closure_value(closure, vec![lex_req])
1730                        })
1731                        .await;
1732                        Ok::<_, std::convert::Infallible>(match result {
1733                            Ok(Ok(resp)) => build_hyper_response(&resp),
1734                            Ok(Err(e)) => error_response(500, &format!("internal error: {e}")),
1735                            Err(e) => error_response(500, &format!("task panicked: {e}")),
1736                        })
1737                    }
1738                });
1739                if let Err(e) = http1::Builder::new().serve_connection(io, svc).await {
1740                    eprintln!("net.serve_fn: connection error: {e}");
1741                }
1742            });
1743        }
1744    })
1745}
1746
1747/// Build a Lex request record from hyper request parts and pre-collected body bytes.
1748fn build_request_value_parts(
1749    parts: &hyper::http::request::Parts,
1750    body: &bytes::Bytes,
1751) -> Value {
1752    let method = parts.method.as_str().to_string();
1753    let uri = parts.uri.to_string();
1754    let (path, query) = match uri.split_once('?') {
1755        Some((p, q)) => (p.to_string(), q.to_string()),
1756        None => (uri, String::new()),
1757    };
1758    let mut headers_map = std::collections::BTreeMap::new();
1759    for (name, val) in &parts.headers {
1760        if let Ok(v) = val.to_str() {
1761            headers_map.insert(
1762                lex_bytecode::MapKey::Str(name.as_str().to_ascii_lowercase()),
1763                Value::Str(v.to_string()),
1764            );
1765        }
1766    }
1767    let body_str = String::from_utf8_lossy(body).into_owned();
1768    let mut rec = indexmap::IndexMap::new();
1769    rec.insert("method".into(), Value::Str(method));
1770    rec.insert("path".into(), Value::Str(path));
1771    rec.insert("query".into(), Value::Str(query));
1772    rec.insert("body".into(), Value::Str(body_str));
1773    rec.insert("headers".into(), Value::Map(headers_map));
1774    Value::Record(rec)
1775}
1776
1777/// Build a Lex request record from a tiny_http request (used by the TLS path).
1778fn build_request_value_tiny(req: &mut tiny_http::Request) -> Value {
1779    let method = format!("{:?}", req.method()).to_uppercase();
1780    let url = req.url().to_string();
1781    let (path, query) = match url.split_once('?') {
1782        Some((p, q)) => (p.to_string(), q.to_string()),
1783        None => (url, String::new()),
1784    };
1785    let mut headers_map = std::collections::BTreeMap::new();
1786    for h in req.headers() {
1787        headers_map.insert(
1788            lex_bytecode::MapKey::Str(h.field.as_str().as_str().to_ascii_lowercase()),
1789            Value::Str(h.value.as_str().to_string()),
1790        );
1791    }
1792    let mut body = String::new();
1793    let _ = req.as_reader().read_to_string(&mut body);
1794    let mut rec = indexmap::IndexMap::new();
1795    rec.insert("method".into(), Value::Str(method));
1796    rec.insert("path".into(), Value::Str(path));
1797    rec.insert("query".into(), Value::Str(query));
1798    rec.insert("body".into(), Value::Str(body));
1799    rec.insert("headers".into(), Value::Map(headers_map));
1800    Value::Record(rec)
1801}
1802
1803fn unpack_response(v: &Value) -> (u16, ResponseBodyOut, Vec<(String, String)>) {
1804    if let Value::Record(rec) = v {
1805        let status = rec.get("status").and_then(|s| match s {
1806            Value::Int(n) => Some(*n as u16),
1807            _ => None,
1808        }).unwrap_or(200);
1809        let body = match rec.get("body") {
1810            // Tagged ResponseBody (#375): BodyStr | BodyStream | BodyBytes.
1811            Some(Value::Variant { name, args }) => match (name.as_str(), args.as_slice()) {
1812                ("BodyStr",    [Value::Str(s)])             => ResponseBodyOut::Str(s.clone()),
1813                ("BodyStream", [iter_v])                    => ResponseBodyOut::TextChunks(drain_iter_str(iter_v)),
1814                ("BodyBytes",  [iter_v])                    => ResponseBodyOut::BytesChunks(drain_iter_bytes(iter_v)),
1815                _ => ResponseBodyOut::Str(String::new()),
1816            },
1817            // Escape hatch for handlers that don't use the nominal
1818            // `Response` alias and just return a structural record with
1819            // `body :: Str` (the pre-#375 contract). Lets internal
1820            // test handlers and one-liners keep working without
1821            // wrapping in `BodyStr(...)`.
1822            Some(Value::Str(s)) => ResponseBodyOut::Str(s.clone()),
1823            _ => ResponseBodyOut::Str(String::new()),
1824        };
1825        let headers: Vec<(String, String)> = if let Some(Value::Map(hmap)) = rec.get("headers") {
1826            hmap.iter().filter_map(|(k, v)| {
1827                if let (lex_bytecode::MapKey::Str(name), Value::Str(val)) = (k, v) {
1828                    Some((name.clone(), val.clone()))
1829                } else {
1830                    None
1831                }
1832            }).collect()
1833        } else {
1834            vec![]
1835        };
1836        return (status, body, headers);
1837    }
1838    (
1839        500,
1840        ResponseBodyOut::Str(format!("handler returned non-record: {v:?}")),
1841        vec![],
1842    )
1843}
1844
1845type HyperRespBody =
1846    http_body_util::combinators::BoxBody<bytes::Bytes, std::convert::Infallible>;
1847
1848/// Build a hyper response from the Lex value returned by a handler closure.
1849/// Streaming bodies (`BodyStream`, `BodyBytes`) use `ChunkedBody` which has no
1850/// known `size_hint`, so hyper emits `Transfer-Encoding: chunked` on the wire.
1851/// Plain string bodies use `Full<Bytes>` which carries `Content-Length`.
1852fn build_hyper_response(v: &Value) -> hyper::Response<HyperRespBody> {
1853    use http_body_util::BodyExt as _;
1854    let (status, body, headers) = unpack_response(v);
1855    let boxed_body: HyperRespBody = match body {
1856        ResponseBodyOut::Str(s) => {
1857            http_body_util::Full::new(bytes::Bytes::from(s.into_bytes())).boxed()
1858        }
1859        ResponseBodyOut::TextChunks(chunks) | ResponseBodyOut::BytesChunks(chunks) => {
1860            HyperChunkedBody::from(chunks).boxed()
1861        }
1862    };
1863    let mut builder = hyper::Response::builder().status(status);
1864    for (name, val) in headers {
1865        builder = builder.header(name, val);
1866    }
1867    builder
1868        .body(boxed_body)
1869        .unwrap_or_else(|_| error_response(500, "response build error"))
1870}
1871
1872fn error_response(status: u16, msg: &str) -> hyper::Response<HyperRespBody> {
1873    use http_body_util::BodyExt as _;
1874    hyper::Response::builder()
1875        .status(status)
1876        .body(
1877            http_body_util::Full::new(bytes::Bytes::from(msg.to_owned()))
1878                .boxed(),
1879        )
1880        .unwrap_or_else(|_| {
1881            use http_body_util::BodyExt as _;
1882            hyper::Response::new(http_body_util::Empty::new().map_err(|e| match e {}).boxed())
1883        })
1884}
1885
1886/// Async body that emits pre-collected chunks as separate HTTP frames, causing
1887/// hyper to use `Transfer-Encoding: chunked` (no `size_hint` exact count).
1888struct HyperChunkedBody {
1889    chunks: std::collections::VecDeque<Vec<u8>>,
1890}
1891
1892impl From<Vec<Vec<u8>>> for HyperChunkedBody {
1893    fn from(chunks: Vec<Vec<u8>>) -> Self {
1894        Self {
1895            chunks: chunks.into_iter().filter(|c| !c.is_empty()).collect(),
1896        }
1897    }
1898}
1899
1900impl hyper::body::Body for HyperChunkedBody {
1901    type Data = bytes::Bytes;
1902    type Error = std::convert::Infallible;
1903
1904    fn poll_frame(
1905        mut self: std::pin::Pin<&mut Self>,
1906        _cx: &mut std::task::Context<'_>,
1907    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
1908        match self.chunks.pop_front() {
1909            Some(chunk) => std::task::Poll::Ready(Some(Ok(hyper::body::Frame::data(
1910                bytes::Bytes::from(chunk),
1911            )))),
1912            None => std::task::Poll::Ready(None),
1913        }
1914    }
1915}
1916
1917/// Send `body` back on a TLS `tiny_http` request. Used only by the
1918/// `net.serve_tls` path which still runs on tiny_http pending a
1919/// tokio-rustls migration.
1920fn respond_with_body_tls(
1921    req: tiny_http::Request,
1922    status: u16,
1923    body: ResponseBodyOut,
1924    headers: Vec<(String, String)>,
1925) {
1926    let tiny_headers: Vec<tiny_http::Header> = headers
1927        .into_iter()
1928        .filter_map(|(name, val)| format!("{name}: {val}").parse::<tiny_http::Header>().ok())
1929        .collect();
1930    match body {
1931        ResponseBodyOut::Str(s) => {
1932            let mut response = tiny_http::Response::from_string(s).with_status_code(status);
1933            for h in tiny_headers {
1934                response.add_header(h);
1935            }
1936            let _ = req.respond(response);
1937        }
1938        ResponseBodyOut::TextChunks(chunks) | ResponseBodyOut::BytesChunks(chunks) => {
1939            let reader = ChunkReader::new(chunks);
1940            let response = tiny_http::Response::new(
1941                tiny_http::StatusCode(status),
1942                tiny_headers,
1943                reader,
1944                None,
1945                None,
1946            );
1947            let _ = req.respond(response);
1948        }
1949    }
1950}
1951
1952/// Decoded `Response.body` (#375). The runtime emits each variant via a
1953/// different `tiny_http` path: a single `Response::from_string` for
1954/// `Str`, and a chunked-encoding `Response::new` with a `Read`-backed
1955/// chunk list for the streaming variants.
1956enum ResponseBodyOut {
1957    Str(String),
1958    /// Pre-drained text chunks. v1 ships eager-iter only; lazy producers
1959    /// (#376 follow-up) will replace this with a Read adapter that pulls
1960    /// chunks on demand from the VM.
1961    TextChunks(Vec<Vec<u8>>),
1962    /// Pre-drained binary chunks. Each inner `Vec<u8>` is one Lex
1963    /// `List[Int]` collapsed down to a byte vector.
1964    BytesChunks(Vec<Vec<u8>>),
1965}
1966
1967/// Walk a Lex `Iter[Str]` (eager (List, Int) representation) and produce
1968/// a chunk list. The chunks are byte vectors so the chunked-Read adapter
1969/// is uniform across text and binary streams.
1970///
1971/// Iter[T] representation shifted in #376: from `Tuple([list, idx])` to
1972/// `Variant("__IterEager", [list, idx])` for the eager form. Lazy iters
1973/// produced by `iter.unfold` (`Variant("__IterLazy", [seed, step])`) and
1974/// cursor-backed iters (`Variant("__IterCursor", [handle])` from #379)
1975/// are not drained eagerly here — the v1 streaming path covers only the
1976/// eager form. Lazy/cursor producers will be wired through the
1977/// `ChunkReader` in a follow-up so each `read()` calls `iter.next` via
1978/// the VM, preserving wall-clock chunk boundaries on the wire.
1979fn drain_iter_str(v: &Value) -> Vec<Vec<u8>> {
1980    match v {
1981        Value::Variant { name, args }
1982            if name == "__IterEager" && args.len() == 2 =>
1983        {
1984            if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
1985                items.iter().skip(*idx as usize).filter_map(|item| {
1986                    if let Value::Str(s) = item { Some(s.as_bytes().to_vec()) } else { None }
1987                }).collect()
1988            } else {
1989                Vec::new()
1990            }
1991        }
1992        _ => Vec::new(),
1993    }
1994}
1995
1996/// Walk a Lex `Iter[List[Int]]` and produce a chunk list. Each `List[Int]`
1997/// element is collapsed by truncating each Int to u8 (0..=255). See
1998/// `drain_iter_str` for the lazy/cursor-iter limitation.
1999fn drain_iter_bytes(v: &Value) -> Vec<Vec<u8>> {
2000    match v {
2001        Value::Variant { name, args }
2002            if name == "__IterEager" && args.len() == 2 =>
2003        {
2004            if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
2005                items.iter().skip(*idx as usize).filter_map(|item| {
2006                    if let Value::List(ints) = item {
2007                        Some(ints.iter().filter_map(|i| match i {
2008                            Value::Int(n) => Some((*n & 0xff) as u8),
2009                            _ => None,
2010                        }).collect::<Vec<u8>>())
2011                    } else {
2012                        None
2013                    }
2014                }).collect()
2015            } else {
2016                Vec::new()
2017            }
2018        }
2019        _ => Vec::new(),
2020    }
2021}
2022
2023/// `Read` adapter that returns one Lex chunk per `read()` call so
2024/// `tiny_http`'s chunked transfer-encoding emits each Lex chunk as a
2025/// distinct HTTP chunk on the wire. When the requested buffer is smaller
2026/// than the current chunk we serve a slice and keep the remainder for
2027/// the next call.
2028struct ChunkReader {
2029    chunks: std::collections::VecDeque<Vec<u8>>,
2030    cursor: usize,
2031}
2032
2033impl ChunkReader {
2034    fn new(chunks: Vec<Vec<u8>>) -> Self {
2035        Self {
2036            chunks: chunks.into_iter().filter(|c| !c.is_empty()).collect(),
2037            cursor: 0,
2038        }
2039    }
2040}
2041
2042impl std::io::Read for ChunkReader {
2043    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
2044        loop {
2045            let Some(front) = self.chunks.front() else {
2046                return Ok(0);
2047            };
2048            let remaining = &front[self.cursor..];
2049            if remaining.is_empty() {
2050                self.chunks.pop_front();
2051                self.cursor = 0;
2052                continue;
2053            }
2054            let n = remaining.len().min(buf.len());
2055            buf[..n].copy_from_slice(&remaining[..n]);
2056            self.cursor += n;
2057            if self.cursor >= front.len() {
2058                self.chunks.pop_front();
2059                self.cursor = 0;
2060            }
2061            return Ok(n);
2062        }
2063    }
2064}
2065
2066/// HTTP/1.1 client backed by `ureq` + `rustls`. Accepts both
2067/// `http://` and `https://` URLs. Returns `Result[Str, Str]` as a
2068/// Lex `Value::Variant`. The earlier hand-rolled HTTP/1.0 client
2069/// was plain-TCP only — most public APIs are HTTPS, so the demo
2070/// could fetch `example.com` but not `wttr.in` or `api.github.com`.
2071fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
2072    use std::time::Duration;
2073    // ureq 3 puts 4xx/5xx behind `Error::StatusCode(code)` and consumes
2074    // the response, so the body would be lost. Disabling
2075    // `http_status_as_error` lets us check the status manually and
2076    // surface `Err("status 404: <body>")` like the old code did.
2077    let agent: ureq::Agent = ureq::Agent::config_builder()
2078        .timeout_connect(Some(Duration::from_secs(10)))
2079        .timeout_recv_body(Some(Duration::from_secs(30)))
2080        .timeout_send_body(Some(Duration::from_secs(10)))
2081        .http_status_as_error(false)
2082        .build()
2083        .into();
2084    let resp = match (method, body) {
2085        ("GET", _) => agent.get(url).call(),
2086        ("POST", Some(b)) => agent.post(url).send(b),
2087        ("POST", None) => agent.post(url).send(""),
2088        (m, _) => return err_value(format!("unsupported method: {m}")),
2089    };
2090    match resp {
2091        Ok(mut r) => {
2092            let status = r.status().as_u16();
2093            let body = r.body_mut().read_to_string().unwrap_or_default();
2094            if (200..300).contains(&status) {
2095                Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
2096            } else {
2097                err_value(format!("status {status}: {body}"))
2098            }
2099        }
2100        Err(e) => err_value(format!("transport: {e}")),
2101    }
2102}
2103
2104/// Build a ureq agent for `std.http.{send,get,post}` with the given
2105/// timeout (None → use the same defaults as the legacy `net.{get,post}`
2106/// path). Separate from `http_request` so the rich `http.send` flow
2107/// can supply per-request overrides.
2108fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
2109    use std::time::Duration;
2110    let mut b = ureq::Agent::config_builder()
2111        .timeout_connect(Some(Duration::from_secs(10)))
2112        .timeout_recv_body(Some(Duration::from_secs(30)))
2113        .timeout_send_body(Some(Duration::from_secs(10)))
2114        .http_status_as_error(false);
2115    if let Some(ms) = timeout_ms {
2116        let d = Duration::from_millis(ms);
2117        b = b.timeout_global(Some(d));
2118    }
2119    b.build().into()
2120}
2121
2122/// Map ureq's transport error to the structured `HttpError` variant
2123/// std.http exposes to user code. Anything not specifically a
2124/// timeout / TLS error funnels into `NetworkError`.
2125fn http_error_value(e: ureq::Error) -> Value {
2126    let (ctor, payload): (&str, Option<String>) = match &e {
2127        ureq::Error::Timeout(_) => ("TimeoutError", None),
2128        ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
2129        ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
2130        ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
2131        _ => ("NetworkError", Some(format!("{e}"))),
2132    };
2133    let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
2134    let inner = Value::Variant { name: ctor.into(), args };
2135    Value::Variant { name: "Err".into(), args: vec![inner] }
2136}
2137
2138fn http_decode_err(msg: String) -> Value {
2139    let inner = Value::Variant {
2140        name: "DecodeError".into(),
2141        args: vec![Value::Str(msg)],
2142    };
2143    Value::Variant { name: "Err".into(), args: vec![inner] }
2144}
2145
2146/// Run a request and pack the ureq response into the
2147/// `{ status, headers, body }` Lex record (or the structured
2148/// `HttpError` on failure). `headers_extra` pairs are appended to the
2149/// outgoing request after `content_type` is applied.
2150fn http_send_simple(
2151    method: &str,
2152    url: &str,
2153    body: Option<Vec<u8>>,
2154    content_type: &str,
2155    timeout_ms: Option<u64>,
2156) -> Value {
2157    http_send_full(method, url, body, content_type, &[], timeout_ms)
2158}
2159
2160fn http_send_full(
2161    method: &str,
2162    url: &str,
2163    body: Option<Vec<u8>>,
2164    content_type: &str,
2165    headers: &[(String, String)],
2166    timeout_ms: Option<u64>,
2167) -> Value {
2168    let agent = http_agent(timeout_ms);
2169    let resp = match method {
2170        "GET" => {
2171            let mut req = agent.get(url);
2172            if !content_type.is_empty() { req = req.header("content-type", content_type); }
2173            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
2174            req.call()
2175        }
2176        "POST" => {
2177            let body = body.unwrap_or_default();
2178            let mut req = agent.post(url);
2179            if !content_type.is_empty() { req = req.header("content-type", content_type); }
2180            for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
2181            req.send(&body[..])
2182        }
2183        m => {
2184            // Other methods (PUT, DELETE, PATCH, ...) fall through
2185            // here in v1.5; for now surface a structured DecodeError
2186            // so the caller can match it.
2187            return http_decode_err(format!("unsupported method: {m}"));
2188        }
2189    };
2190    match resp {
2191        Ok(mut r) => {
2192            let status = r.status().as_u16() as i64;
2193            let headers_map = collect_response_headers(r.headers());
2194            let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
2195                Ok(b) => b,
2196                Err(e) => return http_decode_err(format!("body read: {e}")),
2197            };
2198            let mut rec = indexmap::IndexMap::new();
2199            rec.insert("status".into(), Value::Int(status));
2200            rec.insert("headers".into(), Value::Map(headers_map));
2201            rec.insert("body".into(), Value::Bytes(body_bytes));
2202            Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
2203        }
2204        Err(e) => http_error_value(e),
2205    }
2206}
2207
2208fn collect_response_headers(
2209    headers: &ureq::http::HeaderMap,
2210) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
2211    let mut out = std::collections::BTreeMap::new();
2212    for (name, value) in headers.iter() {
2213        let v = value.to_str().unwrap_or("").to_string();
2214        out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
2215    }
2216    out
2217}
2218
2219/// Pull the standard `HttpRequest` shape out of a `Value::Record`
2220/// and dispatch through `http_send_full`. The handler verifies
2221/// `--allow-net-host` for the URL before sending.
2222fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
2223    let method = match req.get("method") {
2224        Some(Value::Str(s)) => s.clone(),
2225        _ => return http_decode_err("HttpRequest.method must be Str".into()),
2226    };
2227    let url = match req.get("url") {
2228        Some(Value::Str(s)) => s.clone(),
2229        _ => return http_decode_err("HttpRequest.url must be Str".into()),
2230    };
2231    if let Err(e) = handler.ensure_host_allowed(&url) {
2232        return http_decode_err(e);
2233    }
2234    let body = match req.get("body") {
2235        Some(Value::Variant { name, args }) if name == "None" => None,
2236        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2237            [Value::Bytes(b)] => Some(b.clone()),
2238            _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
2239        },
2240        _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
2241    };
2242    let timeout_ms = match req.get("timeout_ms") {
2243        Some(Value::Variant { name, .. }) if name == "None" => None,
2244        Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2245            [Value::Int(n)] if *n >= 0 => Some(*n as u64),
2246            _ => return http_decode_err(
2247                "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
2248        },
2249        _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
2250    };
2251    let headers: Vec<(String, String)> = match req.get("headers") {
2252        Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
2253            let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
2254            let vv = match v { Value::Str(s) => s.clone(), _ => return None };
2255            Some((kk, vv))
2256        }).collect(),
2257        _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
2258    };
2259    http_send_full(&method, &url, body, "", &headers, timeout_ms)
2260}
2261
2262fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
2263    match v {
2264        Some(Value::Record(r)) => Ok(r),
2265        Some(other) => Err(format!("expected Record, got {other:?}")),
2266        None => Err("missing Record argument".into()),
2267    }
2268}
2269
2270fn err_value(msg: String) -> Value {
2271    Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
2272}
2273
2274fn expect_str(v: Option<&Value>) -> Result<&str, String> {
2275    match v {
2276        Some(Value::Str(s)) => Ok(s),
2277        Some(other) => Err(format!("expected Str arg, got {other:?}")),
2278        None => Err("missing argument".into()),
2279    }
2280}
2281
2282fn expect_int(v: Option<&Value>) -> Result<i64, String> {
2283    match v {
2284        Some(Value::Int(n)) => Ok(*n),
2285        Some(other) => Err(format!("expected Int arg, got {other:?}")),
2286        None => Err("missing argument".into()),
2287    }
2288}
2289
2290fn ok(v: Value) -> Value {
2291    Value::Variant { name: "Ok".into(), args: vec![v] }
2292}
2293fn err(v: Value) -> Value {
2294    Value::Variant { name: "Err".into(), args: vec![v] }
2295}
2296
2297/// Build a `SqlError = { message, code, detail }` Lex record (#380).
2298/// `code` and `detail` are `None` by default; the driver-specific
2299/// converters below populate them with real values.
2300fn sql_error(message: impl Into<String>, code: Option<String>, detail: Option<String>) -> Value {
2301    let some = |s: String| Value::Variant { name: "Some".into(), args: vec![Value::Str(s)] };
2302    let none = || Value::Variant { name: "None".into(), args: vec![] };
2303    let mut rec = indexmap::IndexMap::new();
2304    rec.insert("message".into(), Value::Str(message.into()));
2305    rec.insert("code".into(), match code {
2306        Some(c) => some(c),
2307        None => none(),
2308    });
2309    rec.insert("detail".into(), match detail {
2310        Some(d) => some(d),
2311        None => none(),
2312    });
2313    Value::Record(rec)
2314}
2315
2316/// Convert a rusqlite error into a `SqlError`. The `code` is the
2317/// symbolic extended-result-code name (`SQLITE_BUSY`,
2318/// `SQLITE_CONSTRAINT_UNIQUE`, …) when present — this is what
2319/// callers want for dialect-aware retry / conflict handling.
2320///
2321/// rusqlite has two main error shapes that carry a numeric code:
2322/// `SqliteFailure` (driver-side runtime errors — constraints, busy,
2323/// IO) and `SqlInputError` (statement-preparation failures —
2324/// syntax, unknown table). Both are unpacked the same way.
2325fn sqlite_err_to_sql_error(e: rusqlite::Error, op: &str) -> Value {
2326    let message = format!("{op}: {e}");
2327    match &e {
2328        rusqlite::Error::SqliteFailure(ffi, detail_opt) => {
2329            sql_error(
2330                message,
2331                Some(sqlite_extended_code_name(ffi.extended_code)),
2332                detail_opt.clone(),
2333            )
2334        }
2335        rusqlite::Error::SqlInputError { error, msg, .. } => {
2336            sql_error(
2337                message,
2338                Some(sqlite_extended_code_name(error.extended_code)),
2339                Some(msg.clone()),
2340            )
2341        }
2342        _ => sql_error(message, None, None),
2343    }
2344}
2345
2346/// Map a SQLite extended result code (numeric) to its symbolic name.
2347/// We only cover the codes a Lex caller is likely to dispatch on
2348/// (constraint kinds, busy/locked, read-only, IO); anything else
2349/// falls back to a generic `SQLITE_ERROR_<n>` stringification so the
2350/// numeric code is still recoverable.
2351fn sqlite_extended_code_name(code: i32) -> String {
2352    use rusqlite::ffi::*;
2353    let s = match code {
2354        SQLITE_BUSY => "SQLITE_BUSY",
2355        SQLITE_LOCKED => "SQLITE_LOCKED",
2356        SQLITE_READONLY => "SQLITE_READONLY",
2357        SQLITE_IOERR => "SQLITE_IOERR",
2358        SQLITE_CORRUPT => "SQLITE_CORRUPT",
2359        SQLITE_NOTFOUND => "SQLITE_NOTFOUND",
2360        SQLITE_FULL => "SQLITE_FULL",
2361        SQLITE_CANTOPEN => "SQLITE_CANTOPEN",
2362        SQLITE_PROTOCOL => "SQLITE_PROTOCOL",
2363        SQLITE_SCHEMA => "SQLITE_SCHEMA",
2364        SQLITE_TOOBIG => "SQLITE_TOOBIG",
2365        SQLITE_CONSTRAINT => "SQLITE_CONSTRAINT",
2366        SQLITE_CONSTRAINT_CHECK => "SQLITE_CONSTRAINT_CHECK",
2367        SQLITE_CONSTRAINT_FOREIGNKEY => "SQLITE_CONSTRAINT_FOREIGNKEY",
2368        SQLITE_CONSTRAINT_NOTNULL => "SQLITE_CONSTRAINT_NOTNULL",
2369        SQLITE_CONSTRAINT_PRIMARYKEY => "SQLITE_CONSTRAINT_PRIMARYKEY",
2370        SQLITE_CONSTRAINT_TRIGGER => "SQLITE_CONSTRAINT_TRIGGER",
2371        SQLITE_CONSTRAINT_UNIQUE => "SQLITE_CONSTRAINT_UNIQUE",
2372        SQLITE_CONSTRAINT_VTAB => "SQLITE_CONSTRAINT_VTAB",
2373        SQLITE_CONSTRAINT_ROWID => "SQLITE_CONSTRAINT_ROWID",
2374        SQLITE_MISMATCH => "SQLITE_MISMATCH",
2375        SQLITE_RANGE => "SQLITE_RANGE",
2376        SQLITE_NOTADB => "SQLITE_NOTADB",
2377        SQLITE_AUTH => "SQLITE_AUTH",
2378        _ => return format!("SQLITE_ERROR_{code}"),
2379    };
2380    s.to_string()
2381}
2382
2383/// Convert a postgres error into a `SqlError`. The `code` is the
2384/// 5-character SQLSTATE (`23505`, `40P01`, …); `detail` is the
2385/// driver's optional detail message when present.
2386fn pg_err_to_sql_error(e: postgres::Error, op: &str) -> Value {
2387    let message = format!("{op}: {e}");
2388    let code = e.as_db_error().map(|db| db.code().code().to_string());
2389    let detail = e.as_db_error().and_then(|db| db.detail().map(|s| s.to_string()));
2390    sql_error(message, code, detail)
2391}
2392
2393impl DefaultHandler {
2394    /// Implementation of `agent.call_mcp(server, tool, args_json)`.
2395    /// Goes through the LRU client cache (#197): the named server
2396    /// is spawned on first use and reused on subsequent calls.
2397    /// On failure the offending client is dropped so the next
2398    /// call respawns rather than silently failing forever.
2399    fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
2400        let server = match args.first() {
2401            Some(Value::Str(s)) => s.clone(),
2402            _ => return err(Value::Str(
2403                "agent.call_mcp(server, tool, args_json): server must be Str".into())),
2404        };
2405        let tool = match args.get(1) {
2406            Some(Value::Str(s)) => s.clone(),
2407            _ => return err(Value::Str(
2408                "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
2409        };
2410        let args_json = match args.get(2) {
2411            Some(Value::Str(s)) => s.clone(),
2412            _ => return err(Value::Str(
2413                "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
2414        };
2415        let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
2416            Ok(v) => v,
2417            Err(e) => return err(Value::Str(format!(
2418                "agent.call_mcp: args_json is not valid JSON: {e}"))),
2419        };
2420        match self.mcp_clients.call(&server, &tool, parsed) {
2421            Ok(result) => ok(Value::Str(
2422                serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
2423            Err(e) => err(Value::Str(e)),
2424        }
2425    }
2426
2427    /// Implementation of `agent.cloud_stream(prompt) -> Result[Stream[Str], Str]`
2428    /// (#305 slice 3). The fixture path (`LEX_LLM_STREAM_FIXTURE`)
2429    /// splits the env-var value on `|` and yields each segment as
2430    /// one chunk; it's the load-bearing test hook. Live HTTP
2431    /// chunked-response support is deferred to a follow-up slice.
2432    fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
2433        let _prompt = match args.first() {
2434            Some(Value::Str(s)) => s.clone(),
2435            _ => return err(Value::Str(
2436                "agent.cloud_stream(prompt): prompt must be Str".into())),
2437        };
2438        let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
2439            Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
2440            Err(_) => return err(Value::Str(
2441                "agent.cloud_stream: live streaming not yet implemented; \
2442                 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
2443        };
2444        let handle = self.register_stream(chunks.into_iter());
2445        ok(stream_handle_value(handle))
2446    }
2447
2448    /// Implementation of `stream.next(s) -> Option[T]` (#305 slice 3).
2449    /// Returns `Some(chunk)` for each producer yield and `None` once
2450    /// the producer is exhausted. Unknown handle ids return `None`
2451    /// rather than erroring so streams can be safely consumed past
2452    /// the end (matches the semantics of `Iterator::next`).
2453    fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
2454        let handle = match args.first().and_then(stream_handle_id) {
2455            Some(h) => h,
2456            None => return Value::Variant { name: "None".into(), args: vec![] },
2457        };
2458        let mut streams = match self.streams.lock() {
2459            Ok(g) => g,
2460            Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
2461        };
2462        match streams.get_mut(&handle).and_then(|it| it.next()) {
2463            Some(chunk) => some(Value::Str(chunk)),
2464            None => {
2465                streams.remove(&handle);
2466                Value::Variant { name: "None".into(), args: vec![] }
2467            }
2468        }
2469    }
2470
2471    /// Implementation of `stream.collect(s) -> List[T]` (#305 slice 3).
2472    /// Drains the producer eagerly. Unknown handles drain to an
2473    /// empty list so the contract is `collect ∘ collect = []`
2474    /// (idempotent on a closed stream).
2475    fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
2476        let handle = match args.first().and_then(stream_handle_id) {
2477            Some(h) => h,
2478            None => return Value::List(Vec::new()),
2479        };
2480        let mut iter = {
2481            let mut streams = match self.streams.lock() {
2482                Ok(g) => g,
2483                Err(_) => return Value::List(Vec::new()),
2484            };
2485            match streams.remove(&handle) {
2486                Some(it) => it,
2487                None => return Value::List(Vec::new()),
2488            }
2489        };
2490        let mut out: Vec<Value> = Vec::new();
2491        for chunk in iter.by_ref() {
2492            out.push(Value::Str(chunk));
2493        }
2494        Value::List(out)
2495    }
2496
2497    /// Register a producer iterator and return its handle id. The
2498    /// handle is monotonic-counter-based so two streams created in
2499    /// quick succession get distinct ids.
2500    fn register_stream<I>(&self, iter: I) -> String
2501    where
2502        I: Iterator<Item = String> + Send + 'static,
2503    {
2504        let id = self
2505            .next_stream_id
2506            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2507        let handle = format!("stream_{id}");
2508        if let Ok(mut streams) = self.streams.lock() {
2509            streams.insert(handle.clone(), Box::new(iter));
2510        }
2511        handle
2512    }
2513}
2514
2515/// Build the runtime representation of a `Stream[T]` value:
2516/// `Variant("__StreamHandle", [Str(handle_id)])`. The opaque tag is
2517/// prefixed with `__` so it can't collide with a user-declared
2518/// variant.
2519fn stream_handle_value(handle: String) -> Value {
2520    Value::Variant {
2521        name: "__StreamHandle".into(),
2522        args: vec![Value::Str(handle)],
2523    }
2524}
2525
2526/// Inverse of [`stream_handle_value`] — extract the handle id from
2527/// a Stream value, or `None` if the input doesn't have the
2528/// expected shape.
2529fn stream_handle_id(v: &Value) -> Option<String> {
2530    match v {
2531        Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
2532            Some(Value::Str(h)) => Some(h.clone()),
2533            _ => None,
2534        },
2535        _ => None,
2536    }
2537}
2538
2539/// Implementation of `agent.local_complete(prompt)` (#196).
2540/// Hits Ollama (or any compatible HTTP service via `OLLAMA_HOST`)
2541/// and returns the completion text. Override at the
2542/// `EffectHandler` layer if you need a different transport.
2543fn dispatch_llm_local(args: Vec<Value>) -> Value {
2544    let prompt = match args.first() {
2545        Some(Value::Str(s)) => s.clone(),
2546        _ => return err(Value::Str(
2547            "agent.local_complete(prompt): prompt must be Str".into())),
2548    };
2549    match crate::llm::local_complete(&prompt) {
2550        Ok(text) => ok(Value::Str(text)),
2551        Err(e) => err(Value::Str(e)),
2552    }
2553}
2554
2555/// Implementation of `agent.cloud_complete(prompt)` (#196).
2556/// Hits OpenAI's chat-completions API (or any compatible
2557/// service via `OPENAI_BASE_URL`) and returns the assistant
2558/// message. Requires `OPENAI_API_KEY`. Override at the
2559/// `EffectHandler` layer for custom auth, batching, or other
2560/// providers.
2561fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
2562    let prompt = match args.first() {
2563        Some(Value::Str(s)) => s.clone(),
2564        _ => return err(Value::Str(
2565            "agent.cloud_complete(prompt): prompt must be Str".into())),
2566    };
2567    match crate::llm::cloud_complete(&prompt) {
2568        Ok(text) => ok(Value::Str(text)),
2569        Err(e) => err(Value::Str(e)),
2570    }
2571}
2572
2573fn some(v: Value) -> Value {
2574    Value::Variant { name: "Some".into(), args: vec![v] }
2575}
2576fn none() -> Value {
2577    Value::Variant { name: "None".into(), args: vec![] }
2578}
2579
2580fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
2581    match v {
2582        Some(Value::Bytes(b)) => Ok(b),
2583        Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
2584        None => Err("missing argument".into()),
2585    }
2586}
2587
2588fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
2589    match v {
2590        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2591        Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
2592        None => Err("missing Kv argument".into()),
2593    }
2594}
2595
2596fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
2597    match v {
2598        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2599        Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
2600        None => Err("missing Db argument".into()),
2601    }
2602}
2603
2604#[allow(dead_code)]
2605fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
2606    match v {
2607        Some(Value::List(items)) => items.iter().map(|x| match x {
2608            Value::Str(s) => Ok(s.clone()),
2609            other => Err(format!("expected List[Str] element, got {other:?}")),
2610        }).collect(),
2611        Some(other) => Err(format!("expected List[Str], got {other:?}")),
2612        None => Err("missing List[Str] argument".into()),
2613    }
2614}
2615
2616/// Convert a `List[SqlParam]` value to driver-neutral `SqlParamValue`s.
2617/// SqlParam = PStr(Str) | PInt(Int) | PFloat(Float) | PBool(Bool) | PNull
2618fn expect_sql_params(v: Option<&Value>) -> Result<Vec<SqlParamValue>, String> {
2619    let items = match v {
2620        Some(Value::List(xs)) => xs,
2621        Some(other) => return Err(format!("expected List[SqlParam], got {other:?}")),
2622        None => return Err("missing params argument".into()),
2623    };
2624    items.iter().map(|item| {
2625        match item {
2626            Value::Variant { name, args } => match name.as_str() {
2627                "PStr"   => match args.first() {
2628                    Some(Value::Str(s)) => Ok(SqlParamValue::Text(s.clone())),
2629                    _ => Err("PStr requires a Str argument".into()),
2630                },
2631                "PInt"   => match args.first() {
2632                    Some(Value::Int(n)) => Ok(SqlParamValue::Integer(*n)),
2633                    _ => Err("PInt requires an Int argument".into()),
2634                },
2635                "PFloat" => match args.first() {
2636                    Some(Value::Float(f)) => Ok(SqlParamValue::Real(*f)),
2637                    _ => Err("PFloat requires a Float argument".into()),
2638                },
2639                "PBool"  => match args.first() {
2640                    Some(Value::Bool(b)) => Ok(SqlParamValue::Bool(*b)),
2641                    _ => Err("PBool requires a Bool argument".into()),
2642                },
2643                "PNull"  => Ok(SqlParamValue::Null),
2644                other    => Err(format!("unknown SqlParam constructor `{other}`")),
2645            },
2646            // Backward-compat: bare strings are accepted as PStr.
2647            Value::Str(s) => Ok(SqlParamValue::Text(s.clone())),
2648            other => Err(format!("expected SqlParam variant, got {other:?}")),
2649        }
2650    }).collect()
2651}
2652
2653/// Convert `SqlParamValue`s to rusqlite-typed values for SQLite binding.
2654fn sqlite_params(params: &[SqlParamValue]) -> Vec<rusqlite::types::Value> {
2655    params.iter().map(|p| match p {
2656        SqlParamValue::Text(s)    => rusqlite::types::Value::Text(s.clone()),
2657        SqlParamValue::Integer(n) => rusqlite::types::Value::Integer(*n),
2658        SqlParamValue::Real(f)    => rusqlite::types::Value::Real(*f),
2659        SqlParamValue::Bool(b)    => rusqlite::types::Value::Integer(*b as i64),
2660        SqlParamValue::Null       => rusqlite::types::Value::Null,
2661    }).collect()
2662}
2663
2664/// Box `SqlParamValue`s as `dyn ToSql + Sync` for Postgres binding.
2665fn pg_param_refs(params: &[SqlParamValue]) -> Vec<Box<dyn postgres::types::ToSql + Sync>> {
2666    params.iter().map(|p| -> Box<dyn postgres::types::ToSql + Sync> {
2667        match p {
2668            SqlParamValue::Text(s)    => Box::new(s.clone()),
2669            SqlParamValue::Integer(n) => Box::new(*n),
2670            SqlParamValue::Real(f)    => Box::new(*f),
2671            SqlParamValue::Bool(b)    => Box::new(*b),
2672            SqlParamValue::Null       => Box::new(Option::<String>::None),
2673        }
2674    }).collect()
2675}
2676
2677/// Run a statement on SQLite and pack rows into `Value::List(Value::Record(...))`.
2678fn sql_run_query_sqlite(
2679    conn: &rusqlite::Connection,
2680    stmt_str: &str,
2681    params: &[SqlParamValue],
2682) -> Value {
2683    let mut stmt = match conn.prepare(stmt_str) {
2684        Ok(s)  => s,
2685        Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2686    };
2687    let column_count = stmt.column_count();
2688    let column_names: Vec<String> = (0..column_count)
2689        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
2690        .collect();
2691    let bound = sqlite_params(params);
2692    let bind: Vec<&dyn rusqlite::ToSql> = bound.iter()
2693        .map(|p| p as &dyn rusqlite::ToSql)
2694        .collect();
2695    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
2696        Ok(r)  => r,
2697        Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2698    };
2699    let mut out: Vec<Value> = Vec::new();
2700    loop {
2701        let row = match rows.next() {
2702            Ok(Some(r)) => r,
2703            Ok(None)    => break,
2704            Err(e)      => return err(sqlite_err_to_sql_error(e, "sql.query")),
2705        };
2706        let mut rec = indexmap::IndexMap::new();
2707        for (i, name) in column_names.iter().enumerate() {
2708            let cell = match row.get_ref(i) {
2709                Ok(c)  => sql_value_ref_to_lex(c),
2710                Err(e) => return err(sqlite_err_to_sql_error(e, &format!("sql.query: column {i}"))),
2711            };
2712            rec.insert(name.clone(), cell);
2713        }
2714        out.push(Value::Record(rec));
2715    }
2716    ok(Value::List(out))
2717}
2718
2719/// Run a statement on Postgres and pack rows into `Value::List(Value::Record(...))`.
2720fn sql_run_query_pg(
2721    client: &mut postgres::Client,
2722    stmt_str: &str,
2723    params: &[SqlParamValue],
2724) -> Value {
2725    let pg = pg_param_refs(params);
2726    let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
2727        pg.iter().map(|b| b.as_ref()).collect();
2728    let rows = match client.query(stmt_str, &refs) {
2729        Ok(r)  => r,
2730        Err(e) => return err(pg_err_to_sql_error(e, "sql.query")),
2731    };
2732    let out: Vec<Value> = rows.iter().map(|row| {
2733        Value::Record(pg_row_to_lex_record(row))
2734    }).collect();
2735    ok(Value::List(out))
2736}
2737
2738/// Convert a Postgres row to a Lex record, mapping column types to Lex values.
2739fn pg_row_to_lex_record(row: &postgres::Row) -> indexmap::IndexMap<String, Value> {
2740    use postgres::types::Type;
2741    let mut rec = indexmap::IndexMap::new();
2742    for (i, col) in row.columns().iter().enumerate() {
2743        let ty = col.type_();
2744        let val = if *ty == Type::INT2 || *ty == Type::INT4 || *ty == Type::INT8 {
2745            row.get::<_, Option<i64>>(i).map(Value::Int).unwrap_or(Value::Unit)
2746        } else if *ty == Type::FLOAT4 || *ty == Type::FLOAT8 {
2747            row.get::<_, Option<f64>>(i).map(Value::Float).unwrap_or(Value::Unit)
2748        } else if *ty == Type::BOOL {
2749            row.get::<_, Option<bool>>(i).map(Value::Bool).unwrap_or(Value::Unit)
2750        } else if *ty == Type::BYTEA {
2751            row.get::<_, Option<Vec<u8>>>(i).map(Value::Bytes).unwrap_or(Value::Unit)
2752        } else {
2753            row.get::<_, Option<String>>(i).map(Value::Str).unwrap_or(Value::Unit)
2754        };
2755        rec.insert(col.name().to_string(), val);
2756    }
2757    rec
2758}
2759
2760/// Extract a column value from a row record by name, returning `Option[X]`.
2761fn sql_get_col<F>(args: &[Value], convert: F) -> Result<Value, String>
2762where
2763    F: Fn(&Value) -> Option<Value>,
2764{
2765    let row = args.first().ok_or("sql.get_*: missing row argument")?;
2766    let col = match args.get(1) {
2767        Some(Value::Str(s)) => s.as_str(),
2768        Some(other) => return Err(format!("sql.get_*: column name must be Str, got {other:?}")),
2769        None => return Err("sql.get_*: missing column name argument".into()),
2770    };
2771    let cell = match row {
2772        Value::Record(rec) => rec.get(col).cloned(),
2773        other => return Err(format!("sql.get_*: row must be a Record, got {other:?}")),
2774    };
2775    Ok(match cell.and_then(|v| convert(&v)) {
2776        Some(v) => Value::Variant { name: "Some".into(), args: vec![v] },
2777        None    => Value::Variant { name: "None".into(), args: vec![] },
2778    })
2779}
2780
2781fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
2782    use rusqlite::types::ValueRef;
2783    match v {
2784        ValueRef::Null       => Value::Unit,
2785        ValueRef::Integer(n) => Value::Int(n),
2786        ValueRef::Real(f)    => Value::Float(f),
2787        ValueRef::Text(s)    => Value::Str(String::from_utf8_lossy(s).into_owned()),
2788        ValueRef::Blob(b)    => Value::Bytes(b.to_vec()),
2789    }
2790}
2791
2792// -- log state (process-wide; configurable via log.set_*) --
2793
2794#[derive(Clone, Copy, PartialEq, PartialOrd)]
2795enum LogLevel { Debug, Info, Warn, Error }
2796
2797#[derive(Clone, Copy, PartialEq)]
2798enum LogFormat { Text, Json }
2799
2800#[derive(Clone)]
2801enum LogSink {
2802    Stderr,
2803    File(std::sync::Arc<Mutex<std::fs::File>>),
2804}
2805
2806struct LogState {
2807    level: LogLevel,
2808    format: LogFormat,
2809    sink: LogSink,
2810}
2811
2812fn log_state() -> &'static Mutex<LogState> {
2813    static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
2814    STATE.get_or_init(|| Mutex::new(LogState {
2815        level: LogLevel::Info,
2816        format: LogFormat::Text,
2817        sink: LogSink::Stderr,
2818    }))
2819}
2820
2821fn parse_log_level(s: &str) -> Option<LogLevel> {
2822    match s {
2823        "debug" => Some(LogLevel::Debug),
2824        "info" => Some(LogLevel::Info),
2825        "warn" => Some(LogLevel::Warn),
2826        "error" => Some(LogLevel::Error),
2827        _ => None,
2828    }
2829}
2830
2831fn level_label(l: LogLevel) -> &'static str {
2832    match l {
2833        LogLevel::Debug => "debug",
2834        LogLevel::Info => "info",
2835        LogLevel::Warn => "warn",
2836        LogLevel::Error => "error",
2837    }
2838}
2839
2840fn emit_log(level: LogLevel, msg: &str) {
2841    let state = log_state().lock().unwrap();
2842    if level < state.level {
2843        return;
2844    }
2845    let ts = chrono::Utc::now().to_rfc3339();
2846    let line = match state.format {
2847        LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
2848        LogFormat::Json => {
2849            // Hand-rolled JSON to avoid pulling serde_json into the
2850            // hot path; msg gets minimal escaping (the four common
2851            // cases that break a JSON line).
2852            let escaped = msg
2853                .replace('\\', "\\\\")
2854                .replace('"',  "\\\"")
2855                .replace('\n', "\\n")
2856                .replace('\r', "\\r");
2857            format!(
2858                "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
2859                level_label(level),
2860            )
2861        }
2862    };
2863    let sink = state.sink.clone();
2864    drop(state);
2865    match sink {
2866        LogSink::Stderr => {
2867            use std::io::Write;
2868            let _ = std::io::stderr().write_all(line.as_bytes());
2869        }
2870        LogSink::File(f) => {
2871            use std::io::Write;
2872            if let Ok(mut g) = f.lock() {
2873                let _ = g.write_all(line.as_bytes());
2874            }
2875        }
2876    }
2877}
2878
2879pub(crate) struct ProcessState {
2880    child: std::process::Child,
2881    stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
2882    stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
2883}
2884
2885/// Process-wide registry of live `process.spawn` handles. Capped at
2886/// [`MAX_PROCESS_HANDLES`] to bound long-running programs that spawn
2887/// many short-lived children: on each `spawn` past the cap, the
2888/// least-recently-used entry is dropped (which `Drop`s its
2889/// `ProcessState`, leaving the child orphaned but the registry
2890/// bounded). `process.wait` also drops the entry on completion since
2891/// the handle becomes terminal once the child exits.
2892///
2893/// Each entry is wrapped in `Arc<Mutex<ProcessState>>` so the global
2894/// lookup mutex is held only briefly during dispatch — once we have
2895/// the per-handle `Arc`, the global lock is released and the slow
2896/// op (`wait`, `read_*_line`) only contends on its own handle's
2897/// mutex. Reads on different handles no longer block each other.
2898fn process_registry() -> &'static Mutex<ProcessRegistry> {
2899    static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
2900    REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
2901}
2902
2903const MAX_PROCESS_HANDLES: usize = 256;
2904
2905type SharedProcessState = Arc<Mutex<ProcessState>>;
2906
2907pub(crate) struct ProcessRegistry {
2908    entries: indexmap::IndexMap<u64, SharedProcessState>,
2909    cap: usize,
2910}
2911
2912impl ProcessRegistry {
2913    pub(crate) fn with_capacity(cap: usize) -> Self {
2914        Self { entries: indexmap::IndexMap::new(), cap }
2915    }
2916
2917    /// Insert a freshly-spawned child. If at cap, evict the LRU entry
2918    /// first; the dropped `ProcessState`'s child stays alive (orphaned)
2919    /// but its file descriptors are released.
2920    pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
2921        if self.entries.len() >= self.cap {
2922            self.entries.shift_remove_index(0);
2923        }
2924        self.entries.insert(handle, Arc::new(Mutex::new(state)));
2925    }
2926
2927    /// Look up a handle, marking it most-recently-used on hit. Returns
2928    /// a clone of the shared `Arc` — callers should release the global
2929    /// registry lock before locking the per-handle mutex.
2930    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
2931        let idx = self.entries.get_index_of(&handle)?;
2932        self.entries.move_index(idx, self.entries.len() - 1);
2933        self.entries.get(&handle).cloned()
2934    }
2935
2936    /// Drop the registry entry. The underlying `Arc` may outlive the
2937    /// removal if another op still holds it; that's intentional — the
2938    /// in-flight op finishes against the existing `ProcessState`, and
2939    /// only fresh lookups start failing.
2940    pub(crate) fn remove(&mut self, handle: u64) {
2941        self.entries.shift_remove(&handle);
2942    }
2943
2944    #[cfg(test)]
2945    pub(crate) fn len(&self) -> usize { self.entries.len() }
2946}
2947
2948fn next_process_handle() -> u64 {
2949    static COUNTER: AtomicU64 = AtomicU64::new(1);
2950    COUNTER.fetch_add(1, Ordering::SeqCst)
2951}
2952
2953#[cfg(all(test, unix))]
2954mod process_registry_tests {
2955    use super::{ProcessRegistry, ProcessState};
2956
2957    /// Spawn a trivial short-lived child for use as registry payload.
2958    /// `true` exits immediately — we don't actually run the child for
2959    /// real, we just need a valid `std::process::Child`.
2960    fn fresh_state() -> ProcessState {
2961        let child = std::process::Command::new("true")
2962            .stdout(std::process::Stdio::null())
2963            .stderr(std::process::Stdio::null())
2964            .spawn()
2965            .expect("spawn `true`");
2966        ProcessState { child, stdout: None, stderr: None }
2967    }
2968
2969    #[test]
2970    fn insert_and_get_round_trip() {
2971        let mut r = ProcessRegistry::with_capacity(4);
2972        r.insert(1, fresh_state());
2973        assert!(r.touch_get(1).is_some());
2974        assert!(r.touch_get(2).is_none());
2975    }
2976
2977    #[test]
2978    fn touch_get_returns_distinct_arcs_for_distinct_handles() {
2979        let mut r = ProcessRegistry::with_capacity(4);
2980        r.insert(1, fresh_state());
2981        r.insert(2, fresh_state());
2982        let a = r.touch_get(1).unwrap();
2983        let b = r.touch_get(2).unwrap();
2984        // Different Arcs — pointer-equality check.
2985        assert!(!std::sync::Arc::ptr_eq(&a, &b));
2986    }
2987
2988    #[test]
2989    fn cap_evicts_lru_on_overflow() {
2990        let mut r = ProcessRegistry::with_capacity(2);
2991        r.insert(1, fresh_state());
2992        r.insert(2, fresh_state());
2993        let _ = r.touch_get(1);
2994        r.insert(3, fresh_state());
2995        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2996        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2997        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2998        assert_eq!(r.len(), 2);
2999    }
3000
3001    #[test]
3002    fn cap_with_no_touches_evicts_in_insertion_order() {
3003        let mut r = ProcessRegistry::with_capacity(2);
3004        r.insert(10, fresh_state());
3005        r.insert(20, fresh_state());
3006        r.insert(30, fresh_state());
3007        assert!(r.touch_get(10).is_none());
3008        assert!(r.touch_get(20).is_some());
3009        assert!(r.touch_get(30).is_some());
3010    }
3011
3012    #[test]
3013    fn remove_drops_entry() {
3014        let mut r = ProcessRegistry::with_capacity(4);
3015        r.insert(1, fresh_state());
3016        r.remove(1);
3017        assert!(r.touch_get(1).is_none());
3018        assert_eq!(r.len(), 0);
3019    }
3020
3021    #[test]
3022    fn many_inserts_stay_bounded_at_cap() {
3023        let cap = 8;
3024        let mut r = ProcessRegistry::with_capacity(cap);
3025        for i in 0..(cap as u64 * 3) {
3026            r.insert(i, fresh_state());
3027            assert!(r.len() <= cap);
3028        }
3029        assert_eq!(r.len(), cap);
3030    }
3031
3032    #[test]
3033    fn outstanding_arc_outlives_remove() {
3034        // Holding the per-handle Arc while another op removes the
3035        // entry must not invalidate the in-flight op. Mirrors the
3036        // wait-completes-then-removes pattern.
3037        let mut r = ProcessRegistry::with_capacity(4);
3038        r.insert(1, fresh_state());
3039        let arc = r.touch_get(1).expect("entry exists");
3040        r.remove(1);
3041        // Registry forgot about it, but the Arc still works.
3042        assert!(r.touch_get(1).is_none());
3043        let _state = arc.lock().unwrap();
3044    }
3045}
3046
3047fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
3048    match v {
3049        Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
3050        Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
3051        None => Err("missing ProcessHandle argument".into()),
3052    }
3053}
3054
3055/// Process-wide registry of open `Kv` handles. Each `kv.open` allocates
3056/// a new u64 handle via [`next_kv_handle`] and stores the `sled::Db`
3057/// here; subsequent ops fetch by handle. `kv.close` removes the entry.
3058///
3059/// Capped at [`MAX_KV_HANDLES`] to prevent leaks from long-running
3060/// programs that open many short-lived stores without calling
3061/// `kv.close`. On insert at cap, the least-recently-used entry is
3062/// dropped (closing its `sled::Db`); subsequent ops on the evicted
3063/// handle return the standard "closed or unknown Kv handle" error.
3064/// Any access (`get`, `put`, `delete`, `contains`, `list_prefix`)
3065/// touches the LRU order.
3066fn kv_registry() -> &'static Mutex<KvRegistry> {
3067    static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
3068    REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
3069}
3070
3071/// Maximum number of `kv.open` handles kept alive at once. Past this
3072/// cap, the least-recently-used handle is evicted on each new open.
3073/// Sized so that pathological "open and forget" programs are bounded
3074/// without breaking real-world programs that intentionally keep one or
3075/// two long-lived stores open.
3076const MAX_KV_HANDLES: usize = 256;
3077
3078/// LRU-bounded set of open `sled::Db` instances keyed by `u64` handle.
3079/// Built on `IndexMap` for O(1) insert / remove / lookup with
3080/// insertion-order traversal — touching an entry just shift-moves it
3081/// to the back, evictions pop from the front.
3082pub(crate) struct KvRegistry {
3083    entries: indexmap::IndexMap<u64, sled::Db>,
3084    cap: usize,
3085}
3086
3087impl KvRegistry {
3088    pub(crate) fn with_capacity(cap: usize) -> Self {
3089        Self { entries: indexmap::IndexMap::new(), cap }
3090    }
3091
3092    /// Insert a freshly-opened db. If we're already at cap, evict the
3093    /// LRU entry first; the dropped `sled::Db` closes its files.
3094    pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
3095        if self.entries.len() >= self.cap {
3096            self.entries.shift_remove_index(0);
3097        }
3098        self.entries.insert(handle, db);
3099    }
3100
3101    /// Look up a handle, marking it most-recently-used on hit.
3102    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
3103        let idx = self.entries.get_index_of(&handle)?;
3104        self.entries.move_index(idx, self.entries.len() - 1);
3105        self.entries.get(&handle)
3106    }
3107
3108    /// Explicit `kv.close`: drop the handle if present.
3109    pub(crate) fn remove(&mut self, handle: u64) {
3110        self.entries.shift_remove(&handle);
3111    }
3112
3113    #[cfg(test)]
3114    pub(crate) fn len(&self) -> usize { self.entries.len() }
3115}
3116
3117fn next_kv_handle() -> u64 {
3118    static COUNTER: AtomicU64 = AtomicU64::new(1);
3119    COUNTER.fetch_add(1, Ordering::SeqCst)
3120}
3121
3122/// Process-wide registry of open `Db` handles. Same shape as the kv
3123/// and process registries: per-handle `Arc<Mutex<…>>` so dispatch
3124/// only briefly holds the global lock and ops on different
3125/// connections don't serialize. LRU-bounded at
3126/// [`MAX_SQL_HANDLES`] to avoid leaks from long-running programs
3127/// that open many short-lived databases.
3128fn sql_registry() -> &'static Mutex<SqlRegistry> {
3129    static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
3130    REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
3131}
3132
3133const MAX_SQL_HANDLES: usize = 256;
3134
3135// ── Streaming cursors (#379) ─────────────────────────────────────────
3136//
3137// `sql.query_iter[T]` opens a *server-side* cursor and returns an
3138// `Iter[T]` backed by a producer thread streaming rows through a
3139// bounded mpsc channel. The bytecode `iter.next` op dispatches on the
3140// `__IterCursor(handle)` variant tag and effect-calls
3141// `sql.cursor_next(handle)` to pull one row at a time.
3142//
3143// Producer-thread semantics: while the cursor is live, the producer
3144// holds the underlying SQL connection's `Arc<Mutex<SqlConn>>` lock.
3145// Other ops on the same Db handle block until the cursor is drained
3146// or evicted. This matches every server-side cursor protocol
3147// (sqlite's `sqlite3_step`, Postgres `DECLARE/FETCH`) — neither
3148// driver supports concurrent statements on a single connection.
3149//
3150// Channel capacity: 64 rows. Producer blocks at 64-row backlog,
3151// keeping resident memory bounded regardless of result-set size.
3152// Consumer disconnect (Receiver dropped) causes the next send to
3153// fail, the producer exits, drops the prepared statement, and
3154// releases the SqlConn lock — so closing a cursor is just "stop
3155// calling next and let the receiver go out of scope."
3156
3157const CURSOR_CHANNEL_CAPACITY: usize = 64;
3158const MAX_CURSOR_HANDLES: usize = 256;
3159
3160type CursorReceiver = std::sync::mpsc::Receiver<Result<Value, String>>;
3161
3162pub(crate) struct CursorRegistry {
3163    /// Each cursor's receiver lives behind its own Mutex so multiple
3164    /// `sql.cursor_next` calls on the same cursor serialize correctly.
3165    /// The outer `Arc` lets the global registry lock be released
3166    /// before blocking on `recv()`.
3167    entries: indexmap::IndexMap<u64, Arc<Mutex<CursorReceiver>>>,
3168    cap: usize,
3169}
3170
3171impl CursorRegistry {
3172    pub(crate) fn with_capacity(cap: usize) -> Self {
3173        Self { entries: indexmap::IndexMap::new(), cap }
3174    }
3175
3176    pub(crate) fn insert(&mut self, handle: u64, rx: CursorReceiver) {
3177        if self.entries.len() >= self.cap {
3178            self.entries.shift_remove_index(0);
3179        }
3180        self.entries.insert(handle, Arc::new(Mutex::new(rx)));
3181    }
3182
3183    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<Arc<Mutex<CursorReceiver>>> {
3184        let idx = self.entries.get_index_of(&handle)?;
3185        self.entries.move_index(idx, self.entries.len() - 1);
3186        self.entries.get(&handle).cloned()
3187    }
3188
3189    pub(crate) fn remove(&mut self, handle: u64) {
3190        self.entries.shift_remove(&handle);
3191    }
3192}
3193
3194fn cursor_registry() -> &'static Mutex<CursorRegistry> {
3195    static REGISTRY: OnceLock<Mutex<CursorRegistry>> = OnceLock::new();
3196    REGISTRY.get_or_init(|| Mutex::new(CursorRegistry::with_capacity(MAX_CURSOR_HANDLES)))
3197}
3198
3199fn next_cursor_handle() -> u64 {
3200    static COUNTER: AtomicU64 = AtomicU64::new(1);
3201    COUNTER.fetch_add(1, Ordering::SeqCst)
3202}
3203
3204/// SQLite cursor producer: locks the conn, prepares the statement,
3205/// walks rows, ships each to the consumer through `sender`. Exits on
3206/// row exhaustion, consumer disconnect, or first error. The lock is
3207/// released when the thread function returns (statement dropped first
3208/// to satisfy rusqlite's borrow).
3209fn sqlite_cursor_producer(
3210    conn_arc: Arc<Mutex<SqlConn>>,
3211    stmt_str: String,
3212    params: Vec<SqlParamValue>,
3213    sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
3214) {
3215    let mut conn_guard = match conn_arc.lock() {
3216        Ok(g) => g,
3217        Err(p) => p.into_inner(),
3218    };
3219    let SqlConn::Sqlite(c) = &mut *conn_guard else {
3220        let _ = sender.send(Err("sqlite_cursor_producer called on non-sqlite conn".into()));
3221        return;
3222    };
3223    let mut stmt = match c.prepare(&stmt_str) {
3224        Ok(s) => s,
3225        Err(e) => { let _ = sender.send(Err(format!("prepare: {e}"))); return; }
3226    };
3227    let column_count = stmt.column_count();
3228    let column_names: Vec<String> = (0..column_count)
3229        .map(|i| stmt.column_name(i).unwrap_or("").to_string())
3230        .collect();
3231    let bound = sqlite_params(&params);
3232    let bind: Vec<&dyn rusqlite::ToSql> =
3233        bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
3234    let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
3235        Ok(r) => r,
3236        Err(e) => { let _ = sender.send(Err(format!("query: {e}"))); return; }
3237    };
3238    loop {
3239        match rows.next() {
3240            Ok(None) => break,
3241            Err(e) => {
3242                let _ = sender.send(Err(format!("row: {e}")));
3243                break;
3244            }
3245            Ok(Some(row)) => {
3246                let mut rec = indexmap::IndexMap::new();
3247                for (i, name) in column_names.iter().enumerate() {
3248                    let val = match row.get_ref(i) {
3249                        Ok(vr) => sql_value_ref_to_lex(vr),
3250                        Err(_) => Value::Unit,
3251                    };
3252                    rec.insert(name.clone(), val);
3253                }
3254                if sender.send(Ok(Value::Record(rec))).is_err() {
3255                    break;
3256                }
3257            }
3258        }
3259    }
3260}
3261
3262/// Postgres cursor producer: opens a transaction + named cursor,
3263/// fetches rows in batches, ships each one through `sender`. Closes
3264/// the cursor and commits the transaction on exit.
3265fn pg_cursor_producer(
3266    conn_arc: Arc<Mutex<SqlConn>>,
3267    stmt_str: String,
3268    params: Vec<SqlParamValue>,
3269    sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
3270) {
3271    let mut conn_guard = match conn_arc.lock() {
3272        Ok(g) => g,
3273        Err(p) => p.into_inner(),
3274    };
3275    let SqlConn::Postgres(c) = &mut *conn_guard else {
3276        let _ = sender.send(Err("pg_cursor_producer called on non-postgres conn".into()));
3277        return;
3278    };
3279    let pg = pg_param_refs(&params);
3280    let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
3281        pg.iter().map(|b| b.as_ref()).collect();
3282    let mut tx = match c.transaction() {
3283        Ok(t) => t,
3284        Err(e) => { let _ = sender.send(Err(format!("begin: {e}"))); return; }
3285    };
3286    // Use a uniquely-named cursor so concurrent producers on
3287    // distinct Db handles don't collide on the cursor namespace.
3288    let cur_name = format!("__lex_cur_{}", next_cursor_handle());
3289    if let Err(e) = tx.execute(
3290        &format!("DECLARE \"{cur_name}\" NO SCROLL CURSOR FOR {stmt_str}"),
3291        &refs,
3292    ) {
3293        let _ = sender.send(Err(format!("declare: {e}")));
3294        return;
3295    }
3296    let fetch_sql = format!("FETCH 64 FROM \"{cur_name}\"");
3297    'outer: loop {
3298        let batch = match tx.query(&fetch_sql, &[]) {
3299            Ok(r) => r,
3300            Err(e) => { let _ = sender.send(Err(format!("fetch: {e}"))); break; }
3301        };
3302        if batch.is_empty() {
3303            break;
3304        }
3305        for row in batch.iter() {
3306            let rec = pg_row_to_lex_record(row);
3307            if sender.send(Ok(Value::Record(rec))).is_err() {
3308                break 'outer;
3309            }
3310        }
3311    }
3312    let _ = tx.execute(&format!("CLOSE \"{cur_name}\""), &[]);
3313    let _ = tx.commit();
3314}
3315
3316/// Driver-neutral SQL parameter value shared between SQLite and Postgres paths.
3317#[derive(Debug, Clone)]
3318enum SqlParamValue {
3319    Text(String),
3320    Integer(i64),
3321    Real(f64),
3322    Bool(bool),
3323    Null,
3324}
3325
3326/// Abstraction over a SQLite connection or a Postgres client.
3327pub(crate) enum SqlConn {
3328    Sqlite(rusqlite::Connection),
3329    Postgres(postgres::Client),
3330}
3331
3332type SharedConn = Arc<Mutex<SqlConn>>;
3333
3334pub(crate) struct SqlRegistry {
3335    entries: indexmap::IndexMap<u64, SharedConn>,
3336    cap: usize,
3337}
3338
3339impl SqlRegistry {
3340    pub(crate) fn with_capacity(cap: usize) -> Self {
3341        Self { entries: indexmap::IndexMap::new(), cap }
3342    }
3343
3344    pub(crate) fn insert(&mut self, handle: u64, conn: SqlConn) {
3345        if self.entries.len() >= self.cap {
3346            self.entries.shift_remove_index(0);
3347        }
3348        self.entries.insert(handle, Arc::new(Mutex::new(conn)));
3349    }
3350
3351    /// Look up a handle, marking it MRU on hit. Returns a clone of
3352    /// the shared `Arc` so callers release the global registry
3353    /// lock before locking the per-handle mutex.
3354    pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
3355        let idx = self.entries.get_index_of(&handle)?;
3356        self.entries.move_index(idx, self.entries.len() - 1);
3357        self.entries.get(&handle).cloned()
3358    }
3359
3360    pub(crate) fn remove(&mut self, handle: u64) {
3361        self.entries.shift_remove(&handle);
3362    }
3363
3364    #[cfg(test)]
3365    pub(crate) fn len(&self) -> usize { self.entries.len() }
3366}
3367
3368fn next_sql_handle() -> u64 {
3369    static COUNTER: AtomicU64 = AtomicU64::new(1);
3370    COUNTER.fetch_add(1, Ordering::SeqCst)
3371}
3372
3373#[cfg(test)]
3374mod sql_registry_tests {
3375    use super::{SqlConn, SqlRegistry};
3376
3377    fn fresh() -> SqlConn {
3378        SqlConn::Sqlite(rusqlite::Connection::open_in_memory().expect("open in-memory sqlite"))
3379    }
3380
3381    #[test]
3382    fn insert_and_get_round_trip() {
3383        let mut r = SqlRegistry::with_capacity(4);
3384        r.insert(1, fresh());
3385        assert!(r.touch_get(1).is_some());
3386        assert!(r.touch_get(2).is_none());
3387    }
3388
3389    #[test]
3390    fn cap_evicts_lru_on_overflow() {
3391        let mut r = SqlRegistry::with_capacity(2);
3392        r.insert(1, fresh());
3393        r.insert(2, fresh());
3394        let _ = r.touch_get(1);
3395        r.insert(3, fresh());
3396        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3397        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3398        assert!(r.touch_get(3).is_some(), "3 just inserted");
3399        assert_eq!(r.len(), 2);
3400    }
3401
3402    #[test]
3403    fn remove_drops_entry() {
3404        let mut r = SqlRegistry::with_capacity(4);
3405        r.insert(1, fresh());
3406        r.remove(1);
3407        assert!(r.touch_get(1).is_none());
3408        assert_eq!(r.len(), 0);
3409    }
3410
3411    #[test]
3412    fn many_inserts_stay_bounded_at_cap() {
3413        let cap = 8;
3414        let mut r = SqlRegistry::with_capacity(cap);
3415        for i in 0..(cap as u64 * 3) {
3416            r.insert(i, fresh());
3417            assert!(r.len() <= cap);
3418        }
3419        assert_eq!(r.len(), cap);
3420    }
3421}
3422
3423#[cfg(test)]
3424mod kv_registry_tests {
3425    use super::KvRegistry;
3426
3427    /// Spin up an isolated `sled::Db` in a temp dir. Each call gets a
3428    /// unique path so concurrent tests don't collide on the lockfile.
3429    fn fresh_db(tag: &str) -> sled::Db {
3430        let dir = std::env::temp_dir().join(format!(
3431            "lex-kv-reg-{}-{}-{}",
3432            std::process::id(),
3433            tag,
3434            std::time::SystemTime::now()
3435                .duration_since(std::time::UNIX_EPOCH)
3436                .unwrap()
3437                .as_nanos()
3438        ));
3439        sled::open(&dir).expect("sled open")
3440    }
3441
3442    #[test]
3443    fn insert_and_get_round_trip() {
3444        let mut r = KvRegistry::with_capacity(4);
3445        r.insert(1, fresh_db("a"));
3446        assert!(r.touch_get(1).is_some());
3447        assert!(r.touch_get(2).is_none());
3448    }
3449
3450    #[test]
3451    fn cap_evicts_lru_on_overflow() {
3452        // cap=2: insert 1, 2; touch 1 (now MRU); insert 3 → 2 evicted.
3453        let mut r = KvRegistry::with_capacity(2);
3454        r.insert(1, fresh_db("c1"));
3455        r.insert(2, fresh_db("c2"));
3456        let _ = r.touch_get(1);
3457        r.insert(3, fresh_db("c3"));
3458        assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3459        assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3460        assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
3461        assert_eq!(r.len(), 2);
3462    }
3463
3464    #[test]
3465    fn cap_with_no_touches_evicts_in_insertion_order() {
3466        // cap=2: insert 1, 2, 3 with no touches → 1 evicted (FIFO).
3467        let mut r = KvRegistry::with_capacity(2);
3468        r.insert(10, fresh_db("f1"));
3469        r.insert(20, fresh_db("f2"));
3470        r.insert(30, fresh_db("f3"));
3471        assert!(r.touch_get(10).is_none());
3472        assert!(r.touch_get(20).is_some());
3473        assert!(r.touch_get(30).is_some());
3474    }
3475
3476    #[test]
3477    fn remove_drops_entry() {
3478        let mut r = KvRegistry::with_capacity(4);
3479        r.insert(1, fresh_db("r1"));
3480        r.remove(1);
3481        assert!(r.touch_get(1).is_none());
3482        assert_eq!(r.len(), 0);
3483    }
3484
3485    #[test]
3486    fn remove_unknown_handle_is_noop() {
3487        let mut r = KvRegistry::with_capacity(4);
3488        r.insert(1, fresh_db("u1"));
3489        r.remove(999);
3490        assert!(r.touch_get(1).is_some());
3491    }
3492
3493    #[test]
3494    fn many_inserts_stay_bounded_at_cap() {
3495        // Exhaust the cap to confirm the registry never grows past it,
3496        // even under sustained churn.
3497        let cap = 8;
3498        let mut r = KvRegistry::with_capacity(cap);
3499        for i in 0..(cap as u64 * 3) {
3500            r.insert(i, fresh_db(&format!("b{i}")));
3501            assert!(r.len() <= cap);
3502        }
3503        assert_eq!(r.len(), cap);
3504    }
3505}