1use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18pub trait IoSink: Send {
21 fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26 fn print_line(&mut self, s: &str) {
27 println!("{s}");
28 }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34 fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37pub type StreamRegistry =
40 std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43 policy: Policy,
44 pub sink: Box<dyn IoSink>,
45 pub read_root: Option<PathBuf>,
48 pub budget_remaining: Arc<AtomicU64>,
55 pub budget_ceiling: Option<u64>,
59 pub program: Option<Arc<Program>>,
63 pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67 pub mcp_clients: crate::mcp_client::McpClientCache,
73 pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80 pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85 pub fn new(policy: Policy) -> Self {
86 let ceiling = policy.budget;
90 let initial = ceiling.unwrap_or(u64::MAX);
91 Self {
92 policy,
93 sink: Box::new(StdoutSink),
94 read_root: None,
95 budget_remaining: Arc::new(AtomicU64::new(initial)),
96 budget_ceiling: ceiling,
97 program: None,
98 chat_registry: None,
99 mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100 streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101 next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102 }
103 }
104
105 pub fn with_program(mut self, program: Arc<Program>) -> Self {
106 self.program = Some(program); self
107 }
108
109 pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110 self.chat_registry = Some(registry); self
111 }
112
113 pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114 self.sink = sink; self
115 }
116
117 pub fn with_read_root(mut self, root: PathBuf) -> Self {
118 self.read_root = Some(root); self
119 }
120
121 fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122 if self.policy.allow_effects.contains(kind) {
123 Ok(())
124 } else {
125 Err(format!("effect `{kind}` not in --allow-effects"))
126 }
127 }
128
129 fn resolve_read_path(&self, p: &str) -> PathBuf {
130 match &self.read_root {
131 Some(root) => root.join(p.trim_start_matches('/')),
132 None => PathBuf::from(p),
133 }
134 }
135
136 fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137 match op {
138 "debug" | "info" | "warn" | "error" => {
139 let msg = expect_str(args.first())?;
140 let level = match op {
141 "debug" => LogLevel::Debug,
142 "info" => LogLevel::Info,
143 "warn" => LogLevel::Warn,
144 _ => LogLevel::Error,
145 };
146 emit_log(level, msg);
147 Ok(Value::Unit)
148 }
149 "set_level" => {
150 let s = expect_str(args.first())?;
151 match parse_log_level(s) {
152 Some(l) => {
153 log_state().lock().unwrap().level = l;
154 Ok(ok(Value::Unit))
155 }
156 None => Ok(err(Value::Str(format!(
157 "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158 }
159 }
160 "set_format" => {
161 let s = expect_str(args.first())?;
162 let fmt = match s {
163 "text" => LogFormat::Text,
164 "json" => LogFormat::Json,
165 other => return Ok(err(Value::Str(format!(
166 "log.set_format: unknown format `{other}`; expected text|json")))),
167 };
168 log_state().lock().unwrap().format = fmt;
169 Ok(ok(Value::Unit))
170 }
171 "set_sink" => {
172 let path = expect_str(args.first())?;
173 if path == "-" {
174 log_state().lock().unwrap().sink = LogSink::Stderr;
175 return Ok(ok(Value::Unit));
176 }
177 if let Err(e) = self.ensure_fs_write_path(path) {
178 return Ok(err(Value::Str(e)));
179 }
180 match std::fs::OpenOptions::new()
181 .create(true).append(true).open(path)
182 {
183 Ok(f) => {
184 log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185 Ok(ok(Value::Unit))
186 }
187 Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188 }
189 }
190 other => Err(format!("unsupported log.{other}")),
191 }
192 }
193
194 fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195 match op {
196 "spawn" => {
197 let cmd = expect_str(args.first())?.to_string();
198 let raw_args = match args.get(1) {
199 Some(Value::List(items)) => items.clone(),
200 _ => return Err("process.spawn: args must be List[Str]".into()),
201 };
202 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203 Value::Str(s) => Ok(s.clone()),
204 other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205 }).collect();
206 let str_args = str_args?;
207 let opts = match args.get(2) {
208 Some(Value::Record(r)) => r.clone(),
209 _ => return Err("process.spawn: missing or invalid opts record".into()),
210 };
211
212 if !self.policy.allow_proc.is_empty() {
214 let basename = std::path::Path::new(&cmd)
215 .file_name()
216 .and_then(|s| s.to_str())
217 .unwrap_or(&cmd);
218 if !self.policy.allow_proc.iter().any(|a| a == basename) {
219 return Ok(err(Value::Str(format!(
220 "process.spawn: `{cmd}` not in --allow-proc {:?}",
221 self.policy.allow_proc
222 ))));
223 }
224 }
225
226 let mut command = std::process::Command::new(&cmd);
227 command.args(&str_args);
228 command.stdin(std::process::Stdio::piped());
229 command.stdout(std::process::Stdio::piped());
230 command.stderr(std::process::Stdio::piped());
231
232 if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233 if name == "Some" {
234 if let Some(Value::Str(s)) = vargs.first() {
235 command.current_dir(s);
236 }
237 }
238 }
239 if let Some(Value::Map(env)) = opts.get("env") {
240 for (k, v) in env {
241 if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242 command.env(ks, vs);
243 }
244 }
245 }
246
247 let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248 Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249 match vargs.first() {
250 Some(Value::Bytes(b)) => Some(b.clone()),
251 _ => None,
252 }
253 }
254 _ => None,
255 };
256
257 let mut child = match command.spawn() {
258 Ok(c) => c,
259 Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260 };
261
262 if let Some(payload) = stdin_payload {
263 if let Some(mut stdin) = child.stdin.take() {
264 use std::io::Write;
265 let _ = stdin.write_all(&payload);
266 }
268 }
269
270 let stdout = child.stdout.take().map(std::io::BufReader::new);
271 let stderr = child.stderr.take().map(std::io::BufReader::new);
272 let handle = next_process_handle();
273 process_registry().lock().unwrap().insert(handle, ProcessState {
274 child,
275 stdout,
276 stderr,
277 });
278 Ok(ok(Value::Int(handle as i64)))
279 }
280 "read_stdout_line" => Self::read_line_op(args, true),
281 "read_stderr_line" => Self::read_line_op(args, false),
282 "wait" => {
283 let h = expect_process_handle(args.first())?;
284 let arc = process_registry().lock().unwrap()
288 .touch_get(h)
289 .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290 let status = {
291 let mut state = arc.lock().unwrap();
292 state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293 };
294 process_registry().lock().unwrap().remove(h);
298 let mut rec = indexmap::IndexMap::new();
299 rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300 #[cfg(unix)]
301 {
302 use std::os::unix::process::ExitStatusExt;
303 rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304 }
305 #[cfg(not(unix))]
306 {
307 rec.insert("signaled".into(), Value::Bool(false));
308 }
309 Ok(Value::Record(rec))
310 }
311 "kill" => {
312 let h = expect_process_handle(args.first())?;
313 let _signal = expect_str(args.get(1))?;
314 let arc = process_registry().lock().unwrap()
315 .touch_get(h)
316 .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317 let mut state = arc.lock().unwrap();
318 match state.child.kill() {
321 Ok(_) => Ok(ok(Value::Unit)),
322 Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323 }
324 }
325 "run" => {
326 let cmd = expect_str(args.first())?.to_string();
327 let raw_args = match args.get(1) {
328 Some(Value::List(items)) => items.clone(),
329 _ => return Err("process.run: args must be List[Str]".into()),
330 };
331 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332 Value::Str(s) => Ok(s.clone()),
333 other => Err(format!("process.run: arg must be Str, got {other:?}")),
334 }).collect();
335 let str_args = str_args?;
336 if !self.policy.allow_proc.is_empty() {
337 let basename = std::path::Path::new(&cmd)
338 .file_name()
339 .and_then(|s| s.to_str())
340 .unwrap_or(&cmd);
341 if !self.policy.allow_proc.iter().any(|a| a == basename) {
342 return Ok(err(Value::Str(format!(
343 "process.run: `{cmd}` not in --allow-proc {:?}",
344 self.policy.allow_proc
345 ))));
346 }
347 }
348 match std::process::Command::new(&cmd).args(&str_args).output() {
349 Ok(o) => {
350 let mut rec = indexmap::IndexMap::new();
351 rec.insert("stdout".into(), Value::Str(
352 String::from_utf8_lossy(&o.stdout).to_string()));
353 rec.insert("stderr".into(), Value::Str(
354 String::from_utf8_lossy(&o.stderr).to_string()));
355 rec.insert("exit_code".into(), Value::Int(
356 o.status.code().unwrap_or(-1) as i64));
357 Ok(ok(Value::Record(rec)))
358 }
359 Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360 }
361 }
362 other => Err(format!("unsupported process.{other}")),
363 }
364 }
365
366 fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372 let h = expect_process_handle(args.first())?;
373 let arc = process_registry().lock().unwrap()
374 .touch_get(h)
375 .ok_or_else(|| format!(
376 "process.read_{}_line: closed or unknown ProcessHandle",
377 if is_stdout { "stdout" } else { "stderr" }))?;
378 let mut state = arc.lock().unwrap();
379 let reader_opt = if is_stdout {
380 state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381 } else {
382 state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383 };
384 let reader = match reader_opt {
385 Some(r) => r,
386 None => return Ok(none()),
387 };
388 let mut line = String::new();
389 match reader.read_line(&mut line) {
390 Ok(0) => Ok(none()),
391 Ok(_) => {
392 if line.ends_with('\n') { line.pop(); }
393 if line.ends_with('\r') { line.pop(); }
394 Ok(some(Value::Str(line)))
395 }
396 Err(e) => Err(format!("process.read_*_line: {e}")),
397 }
398 }
399
400 fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401 match op {
402 "exists" => {
403 let path = expect_str(args.first())?.to_string();
404 if let Err(e) = self.ensure_fs_walk_path(&path) {
405 return Ok(err(Value::Str(e)));
406 }
407 Ok(Value::Bool(std::path::Path::new(&path).exists()))
408 }
409 "is_file" => {
410 let path = expect_str(args.first())?.to_string();
411 if let Err(e) = self.ensure_fs_walk_path(&path) {
412 return Ok(err(Value::Str(e)));
413 }
414 Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415 }
416 "is_dir" => {
417 let path = expect_str(args.first())?.to_string();
418 if let Err(e) = self.ensure_fs_walk_path(&path) {
419 return Ok(err(Value::Str(e)));
420 }
421 Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422 }
423 "stat" => {
424 let path = expect_str(args.first())?.to_string();
425 if let Err(e) = self.ensure_fs_walk_path(&path) {
426 return Ok(err(Value::Str(e)));
427 }
428 match std::fs::metadata(&path) {
429 Ok(md) => {
430 let mtime = md.modified()
431 .ok()
432 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433 .map(|d| d.as_secs() as i64)
434 .unwrap_or(0);
435 let mut rec = indexmap::IndexMap::new();
436 rec.insert("size".into(), Value::Int(md.len() as i64));
437 rec.insert("mtime".into(), Value::Int(mtime));
438 rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439 rec.insert("is_file".into(), Value::Bool(md.is_file()));
440 Ok(ok(Value::Record(rec)))
441 }
442 Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443 }
444 }
445 "list_dir" => {
446 let path = expect_str(args.first())?.to_string();
447 if let Err(e) = self.ensure_fs_walk_path(&path) {
448 return Ok(err(Value::Str(e)));
449 }
450 match std::fs::read_dir(&path) {
451 Ok(rd) => {
452 let mut entries: Vec<Value> = Vec::new();
453 for ent in rd {
454 match ent {
455 Ok(e) => {
456 let p = e.path();
457 entries.push(Value::Str(p.to_string_lossy().into_owned()));
458 }
459 Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460 }
461 }
462 Ok(ok(Value::List(entries)))
463 }
464 Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465 }
466 }
467 "walk" => {
468 let path = expect_str(args.first())?.to_string();
469 if let Err(e) = self.ensure_fs_walk_path(&path) {
470 return Ok(err(Value::Str(e)));
471 }
472 let mut paths: Vec<Value> = Vec::new();
473 for ent in walkdir::WalkDir::new(&path) {
474 match ent {
475 Ok(e) => paths.push(Value::Str(
476 e.path().to_string_lossy().into_owned())),
477 Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478 }
479 }
480 Ok(ok(Value::List(paths)))
481 }
482 "glob" => {
483 let pattern = expect_str(args.first())?.to_string();
484 let entries = match glob::glob(&pattern) {
489 Ok(e) => e,
490 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491 };
492 let mut paths: Vec<Value> = Vec::new();
493 for ent in entries {
494 match ent {
495 Ok(p) => {
496 let s = p.to_string_lossy().into_owned();
497 if self.policy.allow_fs_read.is_empty()
498 || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499 {
500 paths.push(Value::Str(s));
501 }
502 }
503 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504 }
505 }
506 Ok(ok(Value::List(paths)))
507 }
508 "mkdir_p" => {
509 let path = expect_str(args.first())?.to_string();
510 if let Err(e) = self.ensure_fs_write_path(&path) {
511 return Ok(err(Value::Str(e)));
512 }
513 match std::fs::create_dir_all(&path) {
514 Ok(_) => Ok(ok(Value::Unit)),
515 Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516 }
517 }
518 "remove" => {
519 let path = expect_str(args.first())?.to_string();
520 if let Err(e) = self.ensure_fs_write_path(&path) {
521 return Ok(err(Value::Str(e)));
522 }
523 let p = std::path::Path::new(&path);
524 let result = if p.is_dir() {
525 std::fs::remove_dir_all(p)
526 } else {
527 std::fs::remove_file(p)
528 };
529 match result {
530 Ok(_) => Ok(ok(Value::Unit)),
531 Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532 }
533 }
534 "copy" => {
535 let src = expect_str(args.first())?.to_string();
536 let dst = expect_str(args.get(1))?.to_string();
537 if let Err(e) = self.ensure_fs_walk_path(&src) {
538 return Ok(err(Value::Str(e)));
539 }
540 if let Err(e) = self.ensure_fs_write_path(&dst) {
541 return Ok(err(Value::Str(e)));
542 }
543 match std::fs::copy(&src, &dst) {
544 Ok(_) => Ok(ok(Value::Unit)),
545 Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546 }
547 }
548 other => Err(format!("unsupported fs.{other}")),
549 }
550 }
551
552 fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557 if self.policy.allow_fs_read.is_empty() {
558 return Ok(());
559 }
560 let p = std::path::Path::new(path);
561 if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562 Ok(())
563 } else {
564 Err(format!("fs path `{path}` outside --allow-fs-read"))
565 }
566 }
567
568 fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571 if self.policy.allow_fs_write.is_empty() {
572 return Ok(());
573 }
574 let p = std::path::Path::new(path);
575 if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576 Ok(())
577 } else {
578 Err(format!("fs path `{path}` outside --allow-fs-write"))
579 }
580 }
581
582 fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586 if self.policy.allow_net_host.is_empty() { return Ok(()); }
587 let host = extract_host(url).unwrap_or("");
588 if self.policy.allow_net_host.iter().any(|h| host == h) {
589 Ok(())
590 } else {
591 Err(format!(
592 "net call to host `{host}` not in --allow-net-host {:?}",
593 self.policy.allow_net_host,
594 ))
595 }
596 }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600 let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601 let host_port = match rest.find('/') {
602 Some(i) => &rest[..i],
603 None => rest,
604 };
605 Some(match host_port.rsplit_once(':') {
606 Some((h, _)) => h,
607 None => host_port,
608 })
609}
610
611impl EffectHandler for DefaultHandler {
612 fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617 let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620 loop {
626 let cur = self.budget_remaining.load(Ordering::SeqCst);
627 if cost > cur {
628 let used = ceiling.saturating_sub(cur);
629 return Err(format!(
630 "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631 }
632 let next = cur - cost;
633 if self.budget_remaining.compare_exchange(cur, next,
636 Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637 return Ok(());
638 }
639 }
640 }
641
642 fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643 if let Some(r) = try_pure_builtin(kind, op, &args) {
647 return r;
648 }
649 if kind == "process" {
653 self.ensure_kind_allowed("proc")?;
654 return self.dispatch_process(op, args);
655 }
656 if kind == "log" {
657 let effect_kind = match op {
660 "debug" | "info" | "warn" | "error" => "log",
661 "set_level" | "set_format" => "io",
662 "set_sink" => {
663 self.ensure_kind_allowed("io")?;
664 self.ensure_kind_allowed("fs_write")?;
665 return self.dispatch_log(op, args);
666 }
667 other => return Err(format!("unsupported log.{other}")),
668 };
669 self.ensure_kind_allowed(effect_kind)?;
670 return self.dispatch_log(op, args);
671 }
672 if kind == "fs" {
673 let effect_kind = match op {
674 "exists" | "is_file" | "is_dir" | "stat"
675 | "list_dir" | "walk" | "glob" => "fs_walk",
676 "mkdir_p" | "remove" => "fs_write",
677 "copy" => {
678 self.ensure_kind_allowed("fs_walk")?;
679 self.ensure_kind_allowed("fs_write")?;
680 return self.dispatch_fs(op, args);
681 }
682 other => return Err(format!("unsupported fs.{other}")),
683 };
684 self.ensure_kind_allowed(effect_kind)?;
685 return self.dispatch_fs(op, args);
686 }
687 if kind == "datetime" && op == "now" {
694 self.ensure_kind_allowed("time")?;
695 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
697 if let Ok(secs) = s.trim().parse::<i64>() {
698 return Ok(Value::Int(secs.saturating_mul(1_000_000_000)));
699 }
700 }
701 let now = chrono::Utc::now();
702 let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
703 return Ok(Value::Int(nanos));
704 }
705 if kind == "crypto" && op == "random" {
706 self.ensure_kind_allowed("random")?;
707 let n = expect_int(args.first())?;
708 if !(0..=1_048_576).contains(&n) {
709 return Err("crypto.random: n must be in 0..=1048576".into());
710 }
711 use rand::{rngs::SysRng, TryRng};
712 let mut buf = vec![0u8; n as usize];
713 SysRng.try_fill_bytes(&mut buf)
714 .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
715 return Ok(Value::Bytes(buf));
716 }
717 if kind == "crypto" && op == "random_str_hex" {
722 self.ensure_kind_allowed("random")?;
723 let n = expect_int(args.first())?;
724 if !(0..=1_048_576).contains(&n) {
725 return Err("crypto.random_str_hex: n must be in 0..=1048576".into());
726 }
727 use rand::{rngs::SysRng, TryRng};
728 let mut buf = vec![0u8; n as usize];
729 SysRng.try_fill_bytes(&mut buf)
730 .map_err(|e| format!("crypto.random_str_hex: OS RNG: {e}"))?;
731 return Ok(Value::Str(hex::encode(&buf)));
732 }
733 if kind == "agent" {
746 let effect_kind = match op {
747 "local_complete" => "llm_local",
748 "cloud_complete" => "llm_cloud",
749 "cloud_stream" => "llm_cloud",
750 "send_a2a" => "a2a",
751 "call_mcp" => "mcp",
752 other => return Err(format!("unsupported agent.{other}")),
753 };
754 self.ensure_kind_allowed(effect_kind)?;
755 return match op {
763 "call_mcp" => Ok(self.dispatch_call_mcp(args)),
764 "local_complete" => Ok(dispatch_llm_local(args)),
765 "cloud_complete" => Ok(dispatch_llm_cloud(args)),
766 "cloud_stream" => Ok(self.dispatch_cloud_stream(args)),
767 _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
768 };
769 }
770 if kind == "stream" {
771 self.ensure_kind_allowed("stream")?;
778 return match op {
779 "next" => Ok(self.dispatch_stream_next(args)),
780 "collect" => Ok(self.dispatch_stream_collect(args)),
781 other => Err(format!("unsupported stream.{other}")),
782 };
783 }
784 if kind == "http" && matches!(op, "send" | "get" | "post") {
785 self.ensure_kind_allowed("net")?;
786 return match op {
787 "send" => {
788 let req = expect_record(args.first())?;
789 Ok(http_send_record(self, req))
790 }
791 "get" => {
792 let url = expect_str(args.first())?.to_string();
793 self.ensure_host_allowed(&url)?;
794 Ok(http_send_simple("GET", &url, None, "", None))
795 }
796 "post" => {
797 let url = expect_str(args.first())?.to_string();
798 let body = expect_bytes(args.get(1))?.clone();
799 let content_type = expect_str(args.get(2))?.to_string();
800 self.ensure_host_allowed(&url)?;
801 Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
802 }
803 _ => unreachable!(),
804 };
805 }
806 self.ensure_kind_allowed(kind)?;
807 match (kind, op) {
808 ("io", "print") => {
809 let line = expect_str(args.first())?;
810 self.sink.print_line(line);
811 Ok(Value::Unit)
812 }
813 ("io", "read") => {
814 let path = expect_str(args.first())?.to_string();
815 let resolved = self.resolve_read_path(&path);
816 if !self.policy.allow_fs_read.is_empty()
823 && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
824 {
825 return Err(format!("read of `{path}` outside --allow-fs-read"));
826 }
827 match std::fs::read_to_string(&resolved) {
828 Ok(s) => Ok(ok(Value::Str(s))),
829 Err(e) => Ok(err(Value::Str(format!("{e}")))),
830 }
831 }
832 ("io", "write") => {
833 let path = expect_str(args.first())?.to_string();
834 let contents = expect_str(args.get(1))?.to_string();
835 if !self.policy.allow_fs_write.is_empty() {
837 let p = std::path::Path::new(&path);
838 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
839 return Err(format!("write to `{path}` outside --allow-fs-write"));
840 }
841 }
842 match std::fs::write(&path, contents) {
843 Ok(_) => Ok(ok(Value::Unit)),
844 Err(e) => Ok(err(Value::Str(format!("{e}")))),
845 }
846 }
847 ("time", "now") => {
848 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
850 if let Ok(secs) = s.trim().parse::<i64>() {
851 return Ok(Value::Int(secs));
852 }
853 }
854 let secs = SystemTime::now().duration_since(UNIX_EPOCH)
855 .map_err(|e| format!("time: {e}"))?.as_secs();
856 Ok(Value::Int(secs as i64))
857 }
858 ("time", "now_ms") => {
859 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
864 if let Ok(secs) = s.trim().parse::<i64>() {
865 return Ok(Value::Int(secs.saturating_mul(1000)));
866 }
867 }
868 let ms = SystemTime::now().duration_since(UNIX_EPOCH)
869 .map_err(|e| format!("time: {e}"))?.as_millis();
870 Ok(Value::Int(ms as i64))
871 }
872 ("time", "now_str") => {
873 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
877 if let Ok(secs) = s.trim().parse::<i64>() {
878 let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0)
879 .unwrap_or_else(chrono::Utc::now);
880 return Ok(Value::Str(dt.to_rfc3339()));
881 }
882 }
883 Ok(Value::Str(chrono::Utc::now().to_rfc3339()))
884 }
885 ("time", "mono_ns") => {
886 static MONO_START: OnceLock<std::time::Instant> = OnceLock::new();
894 let start = MONO_START.get_or_init(std::time::Instant::now);
895 let dur = std::time::Instant::now().duration_since(*start);
896 Ok(Value::Int(dur.as_nanos() as i64))
897 }
898 ("time", "sleep_ms") => {
899 let n = expect_int(args.first())?;
907 if n > 0 {
908 let ms = (n as u64).min(60_000);
909 std::thread::sleep(std::time::Duration::from_millis(ms));
910 }
911 Ok(Value::Unit)
912 }
913 ("rand", "int_in") => {
914 let lo = expect_int(args.first())?;
916 let hi = expect_int(args.get(1))?;
917 Ok(Value::Int((lo + hi) / 2))
918 }
919 ("env", "get") => {
924 let name = expect_str(args.first())?;
925 Ok(match std::env::var(name) {
926 Ok(v) => Value::Variant {
927 name: "Some".into(),
928 args: vec![Value::Str(v)],
929 },
930 Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
931 })
932 }
933 ("budget", _) => {
934 Ok(Value::Unit)
937 }
938 ("net", "get") => {
939 let url = expect_str(args.first())?.to_string();
940 self.ensure_host_allowed(&url)?;
941 Ok(http_request("GET", &url, None))
942 }
943 ("net", "post") => {
944 let url = expect_str(args.first())?.to_string();
945 let body = expect_str(args.get(1))?.to_string();
946 self.ensure_host_allowed(&url)?;
947 Ok(http_request("POST", &url, Some(&body)))
948 }
949 ("net", "serve") => {
950 let port = match args.first() {
951 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
952 _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
953 };
954 let handler_name = expect_str(args.get(1))?.to_string();
955 let program = self.program.clone()
956 .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
957 let policy = self.policy.clone();
958 serve_http(port, handler_name, program, policy, None)
959 }
960 ("net", "serve_fn") => {
961 let port = match args.first() {
962 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
963 _ => return Err("net.serve_fn(port, handler): port must be Int 0..=65535".into()),
964 };
965 let closure = match args.into_iter().nth(1) {
966 Some(c @ Value::Closure { .. }) => c,
967 _ => return Err("net.serve_fn(port, handler): handler must be a closure".into()),
968 };
969 let program = self.program.clone()
970 .ok_or_else(|| "net.serve_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
971 let policy = self.policy.clone();
972 serve_http_fn(port, closure, program, policy)
973 }
974 ("net", "serve_tls") => {
975 let port = match args.first() {
976 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
977 _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
978 };
979 let cert_path = expect_str(args.get(1))?.to_string();
980 let key_path = expect_str(args.get(2))?.to_string();
981 let handler_name = expect_str(args.get(3))?.to_string();
982 let program = self.program.clone()
983 .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
984 let policy = self.policy.clone();
985 let cert = std::fs::read(&cert_path)
986 .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
987 let key = std::fs::read(&key_path)
988 .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
989 serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
990 }
991 ("net", "serve_ws") => {
992 let port = match args.first() {
993 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
994 _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
995 };
996 let handler_name = expect_str(args.get(1))?.to_string();
997 let program = self.program.clone()
998 .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
999 let policy = self.policy.clone();
1000 let registry = Arc::new(crate::ws::ChatRegistry::default());
1001 crate::ws::serve_ws(port, handler_name, program, policy, registry)
1002 }
1003 ("net", "serve_ws_fn") => {
1004 let port = match args.first() {
1005 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
1006 _ => return Err("net.serve_ws_fn(port, subprotocol, handler): port must be Int 0..=65535".into()),
1007 };
1008 let subprotocol = expect_str(args.get(1))?.to_string();
1009 let closure = match args.into_iter().nth(2) {
1010 Some(c @ Value::Closure { .. }) => c,
1011 _ => return Err("net.serve_ws_fn(port, subprotocol, handler): handler must be a closure".into()),
1012 };
1013 let program = self.program.clone()
1014 .ok_or_else(|| "net.serve_ws_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
1015 let policy = self.policy.clone();
1016 let registry = Arc::new(crate::ws::ChatRegistry::default());
1017 crate::ws::serve_ws_fn(port, subprotocol, closure, program, policy, registry)
1018 }
1019 ("chat", "broadcast") => {
1020 let registry = self.chat_registry.as_ref()
1021 .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
1022 let room = expect_str(args.first())?;
1023 let body = expect_str(args.get(1))?;
1024 crate::ws::chat_broadcast(registry, room, body);
1025 Ok(Value::Unit)
1026 }
1027 ("chat", "send") => {
1028 let registry = self.chat_registry.as_ref()
1029 .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
1030 let conn_id = match args.first() {
1031 Some(Value::Int(n)) if *n >= 0 => *n as u64,
1032 _ => return Err("chat.send: conn_id must be non-negative Int".into()),
1033 };
1034 let body = expect_str(args.get(1))?;
1035 Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
1036 }
1037 ("kv", "open") => {
1038 let path = expect_str(args.first())?.to_string();
1039 if !self.policy.allow_fs_write.is_empty() {
1043 let p = std::path::Path::new(&path);
1044 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1045 return Ok(err(Value::Str(format!(
1046 "kv.open: `{path}` outside --allow-fs-write"))));
1047 }
1048 }
1049 match sled::open(&path) {
1050 Ok(db) => {
1051 let handle = next_kv_handle();
1052 kv_registry().lock().unwrap().insert(handle, db);
1053 Ok(ok(Value::Int(handle as i64)))
1054 }
1055 Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
1056 }
1057 }
1058 ("kv", "close") => {
1059 let h = expect_kv_handle(args.first())?;
1060 kv_registry().lock().unwrap().remove(h);
1061 Ok(Value::Unit)
1062 }
1063 ("kv", "get") => {
1064 let h = expect_kv_handle(args.first())?;
1065 let key = expect_str(args.get(1))?;
1066 let mut reg = kv_registry().lock().unwrap();
1067 let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
1068 match db.get(key.as_bytes()) {
1069 Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
1070 Ok(None) => Ok(none()),
1071 Err(e) => Err(format!("kv.get: {e}")),
1072 }
1073 }
1074 ("kv", "put") => {
1075 let h = expect_kv_handle(args.first())?;
1076 let key = expect_str(args.get(1))?.to_string();
1077 let val = expect_bytes(args.get(2))?.clone();
1078 let mut reg = kv_registry().lock().unwrap();
1079 let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
1080 match db.insert(key.as_bytes(), val) {
1081 Ok(_) => Ok(ok(Value::Unit)),
1082 Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
1083 }
1084 }
1085 ("kv", "delete") => {
1086 let h = expect_kv_handle(args.first())?;
1087 let key = expect_str(args.get(1))?;
1088 let mut reg = kv_registry().lock().unwrap();
1089 let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
1090 match db.remove(key.as_bytes()) {
1091 Ok(_) => Ok(ok(Value::Unit)),
1092 Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
1093 }
1094 }
1095 ("kv", "contains") => {
1096 let h = expect_kv_handle(args.first())?;
1097 let key = expect_str(args.get(1))?;
1098 let mut reg = kv_registry().lock().unwrap();
1099 let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1100 match db.contains_key(key.as_bytes()) {
1101 Ok(present) => Ok(Value::Bool(present)),
1102 Err(e) => Err(format!("kv.contains: {e}")),
1103 }
1104 }
1105 ("kv", "list_prefix") => {
1106 let h = expect_kv_handle(args.first())?;
1107 let prefix = expect_str(args.get(1))?;
1108 let mut reg = kv_registry().lock().unwrap();
1109 let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1110 let mut keys: Vec<Value> = Vec::new();
1111 for kv in db.scan_prefix(prefix.as_bytes()) {
1112 let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1113 let s = String::from_utf8_lossy(&k).to_string();
1114 keys.push(Value::Str(s));
1115 }
1116 Ok(Value::List(keys))
1117 }
1118 ("sql", "open") => {
1119 let path = expect_str(args.first())?.to_string();
1120 if path.starts_with("postgres://") || path.starts_with("postgresql://") {
1121 match postgres::Client::connect(&path, postgres::NoTls) {
1123 Ok(client) => {
1124 let handle = next_sql_handle();
1125 sql_registry().lock().unwrap().insert(handle, SqlConn::Postgres(client));
1126 Ok(ok(Value::Int(handle as i64)))
1127 }
1128 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.open"))),
1129 }
1130 } else {
1131 if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1134 let p = std::path::Path::new(&path);
1135 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1136 return Ok(err(sql_error(
1137 format!("sql.open: `{path}` outside --allow-fs-write"),
1138 None, None,
1139 )));
1140 }
1141 }
1142 match rusqlite::Connection::open(&path) {
1143 Ok(conn) => {
1144 let handle = next_sql_handle();
1145 sql_registry().lock().unwrap().insert(handle, SqlConn::Sqlite(conn));
1146 Ok(ok(Value::Int(handle as i64)))
1147 }
1148 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.open"))),
1149 }
1150 }
1151 }
1152 ("sql", "close") => {
1153 let h = expect_sql_handle(args.first())?;
1154 sql_registry().lock().unwrap().remove(h);
1155 Ok(Value::Unit)
1156 }
1157 ("sql", "exec") => {
1158 let h = expect_sql_handle(args.first())?;
1159 let stmt = expect_str(args.get(1))?.to_string();
1160 let params = expect_sql_params(args.get(2))?;
1161 let arc = sql_registry().lock().unwrap()
1162 .touch_get(h)
1163 .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1164 let mut conn = arc.lock().unwrap();
1165 match &mut *conn {
1166 SqlConn::Sqlite(c) => {
1167 let bound = sqlite_params(¶ms);
1168 let bind: Vec<&dyn rusqlite::ToSql> =
1169 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1170 match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1171 Ok(n) => Ok(ok(Value::Int(n as i64))),
1172 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec"))),
1173 }
1174 }
1175 SqlConn::Postgres(c) => {
1176 let pg = pg_param_refs(¶ms);
1177 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1178 pg.iter().map(|b| b.as_ref()).collect();
1179 match c.execute(stmt.as_str(), &refs) {
1180 Ok(n) => Ok(ok(Value::Int(n as i64))),
1181 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec"))),
1182 }
1183 }
1184 }
1185 }
1186 ("sql", "query") => {
1187 let h = expect_sql_handle(args.first())?;
1188 let stmt_str = expect_str(args.get(1))?.to_string();
1189 let params = expect_sql_params(args.get(2))?;
1190 let arc = sql_registry().lock().unwrap()
1191 .touch_get(h)
1192 .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1193 let mut conn = arc.lock().unwrap();
1194 Ok(match &mut *conn {
1195 SqlConn::Sqlite(c) => sql_run_query_sqlite(c, &stmt_str, ¶ms),
1196 SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, ¶ms),
1197 })
1198 }
1199 ("sql", "query_iter") => {
1205 let h = expect_sql_handle(args.first())?;
1206 let stmt_str = expect_str(args.get(1))?.to_string();
1207 let params = expect_sql_params(args.get(2))?;
1208 let arc = sql_registry().lock().unwrap()
1209 .touch_get(h)
1210 .ok_or_else(|| "sql.query_iter: closed or unknown Db handle".to_string())?;
1211
1212 let (sender, receiver) = std::sync::mpsc::sync_channel::<Result<Value, String>>(
1216 CURSOR_CHANNEL_CAPACITY,
1217 );
1218 let cursor_h = next_cursor_handle();
1219 cursor_registry().lock().unwrap().insert(cursor_h, receiver);
1220
1221 let arc_for_thread = Arc::clone(&arc);
1222 let is_sqlite = matches!(*arc.lock().unwrap(), SqlConn::Sqlite(_));
1228 std::thread::spawn(move || {
1229 if is_sqlite {
1230 sqlite_cursor_producer(arc_for_thread, stmt_str, params, sender);
1231 } else {
1232 pg_cursor_producer(arc_for_thread, stmt_str, params, sender);
1233 }
1234 });
1235
1236 Ok(ok(Value::Variant {
1237 name: "__IterCursor".into(),
1238 args: vec![Value::Int(cursor_h as i64)],
1239 }))
1240 }
1241 ("sql", "cursor_next") => {
1247 let h = match args.first() {
1248 Some(Value::Int(n)) if *n >= 0 => *n as u64,
1249 _ => return Err("sql.cursor_next: expected cursor handle (Int)".into()),
1250 };
1251 let rx_arc = match cursor_registry().lock().unwrap().touch_get(h) {
1252 Some(a) => a,
1253 None => return Ok(Value::Variant { name: "None".into(), args: vec![] }),
1254 };
1255 let recv_result = {
1260 let rx = match rx_arc.lock() {
1261 Ok(g) => g,
1262 Err(p) => p.into_inner(),
1263 };
1264 rx.recv()
1265 };
1266 match recv_result {
1267 Ok(Ok(row)) => Ok(Value::Variant {
1268 name: "Some".into(),
1269 args: vec![row],
1270 }),
1271 Ok(Err(_)) | Err(_) => {
1272 cursor_registry().lock().unwrap().remove(h);
1276 Ok(Value::Variant { name: "None".into(), args: vec![] })
1277 }
1278 }
1279 }
1280 ("sql", "begin") => {
1285 let h = expect_sql_handle(args.first())?;
1286 let arc = sql_registry().lock().unwrap()
1287 .touch_get(h)
1288 .ok_or_else(|| "sql.begin: closed or unknown Db handle".to_string())?;
1289 let mut conn = arc.lock().unwrap();
1290 match &mut *conn {
1291 SqlConn::Sqlite(c) => match c.execute_batch("BEGIN") {
1292 Ok(()) => Ok(ok(Value::Int(h as i64))),
1293 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.begin"))),
1294 },
1295 SqlConn::Postgres(c) => match c.batch_execute("BEGIN") {
1296 Ok(()) => Ok(ok(Value::Int(h as i64))),
1297 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.begin"))),
1298 },
1299 }
1300 }
1301 ("sql", "commit") => {
1302 let h = expect_sql_handle(args.first())?;
1303 let arc = sql_registry().lock().unwrap()
1304 .touch_get(h)
1305 .ok_or_else(|| "sql.commit: closed or unknown SqlTx handle".to_string())?;
1306 let mut conn = arc.lock().unwrap();
1307 match &mut *conn {
1308 SqlConn::Sqlite(c) => match c.execute_batch("COMMIT") {
1309 Ok(()) => Ok(ok(Value::Unit)),
1310 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.commit"))),
1311 },
1312 SqlConn::Postgres(c) => match c.batch_execute("COMMIT") {
1313 Ok(()) => Ok(ok(Value::Unit)),
1314 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.commit"))),
1315 },
1316 }
1317 }
1318 ("sql", "rollback") => {
1319 let h = expect_sql_handle(args.first())?;
1320 let arc = sql_registry().lock().unwrap()
1321 .touch_get(h)
1322 .ok_or_else(|| "sql.rollback: closed or unknown SqlTx handle".to_string())?;
1323 let mut conn = arc.lock().unwrap();
1324 match &mut *conn {
1325 SqlConn::Sqlite(c) => match c.execute_batch("ROLLBACK") {
1326 Ok(()) => Ok(ok(Value::Unit)),
1327 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.rollback"))),
1328 },
1329 SqlConn::Postgres(c) => match c.batch_execute("ROLLBACK") {
1330 Ok(()) => Ok(ok(Value::Unit)),
1331 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.rollback"))),
1332 },
1333 }
1334 }
1335 ("sql", "exec_tx") => {
1336 let h = expect_sql_handle(args.first())?;
1337 let stmt = expect_str(args.get(1))?.to_string();
1338 let params = expect_sql_params(args.get(2))?;
1339 let arc = sql_registry().lock().unwrap()
1340 .touch_get(h)
1341 .ok_or_else(|| "sql.exec_tx: closed or unknown SqlTx handle".to_string())?;
1342 let mut conn = arc.lock().unwrap();
1343 match &mut *conn {
1344 SqlConn::Sqlite(c) => {
1345 let bound = sqlite_params(¶ms);
1346 let bind: Vec<&dyn rusqlite::ToSql> =
1347 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1348 match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1349 Ok(n) => Ok(ok(Value::Int(n as i64))),
1350 Err(e) => Ok(err(sqlite_err_to_sql_error(e, "sql.exec_tx"))),
1351 }
1352 }
1353 SqlConn::Postgres(c) => {
1354 let pg = pg_param_refs(¶ms);
1355 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1356 pg.iter().map(|b| b.as_ref()).collect();
1357 match c.execute(stmt.as_str(), &refs) {
1358 Ok(n) => Ok(ok(Value::Int(n as i64))),
1359 Err(e) => Ok(err(pg_err_to_sql_error(e, "sql.exec_tx"))),
1360 }
1361 }
1362 }
1363 }
1364 ("sql", "query_tx") => {
1365 let h = expect_sql_handle(args.first())?;
1366 let stmt_str = expect_str(args.get(1))?.to_string();
1367 let params = expect_sql_params(args.get(2))?;
1368 let arc = sql_registry().lock().unwrap()
1369 .touch_get(h)
1370 .ok_or_else(|| "sql.query_tx: closed or unknown SqlTx handle".to_string())?;
1371 let mut conn = arc.lock().unwrap();
1372 Ok(match &mut *conn {
1373 SqlConn::Sqlite(c) => sql_run_query_sqlite(c, &stmt_str, ¶ms),
1374 SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, ¶ms),
1375 })
1376 }
1377 ("sql", "get_str") => Ok(sql_get_col(&args, |v| match v {
1378 Value::Str(s) => Some(Value::Str(s.clone())),
1379 Value::Int(n) => Some(Value::Str(n.to_string())),
1380 _ => None,
1381 })?),
1382 ("sql", "get_int") => Ok(sql_get_col(&args, |v| match v {
1383 Value::Int(n) => Some(Value::Int(*n)),
1384 Value::Float(f) => Some(Value::Int(*f as i64)),
1385 _ => None,
1386 })?),
1387 ("sql", "get_float") => Ok(sql_get_col(&args, |v| match v {
1388 Value::Float(f) => Some(Value::Float(*f)),
1389 Value::Int(n) => Some(Value::Float(*n as f64)),
1390 _ => None,
1391 })?),
1392 ("sql", "get_bool") => Ok(sql_get_col(&args, |v| match v {
1393 Value::Bool(b) => Some(Value::Bool(*b)),
1394 Value::Int(n) => Some(Value::Bool(*n != 0)),
1395 _ => None,
1396 })?),
1397 ("proc", "spawn") => {
1398 let cmd = expect_str(args.first())?.to_string();
1412 let raw_args = match args.get(1) {
1413 Some(Value::List(items)) => items,
1414 Some(other) => return Err(format!(
1415 "proc.spawn: args must be List[Str], got {other:?}")),
1416 None => return Err("proc.spawn: missing args list".into()),
1417 };
1418 let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1419 Value::Str(s) => Ok(s.clone()),
1420 other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1421 }).collect::<Result<Vec<_>, _>>()?;
1422
1423 if !self.policy.allow_proc.is_empty() {
1427 let basename = std::path::Path::new(&cmd)
1428 .file_name()
1429 .and_then(|s| s.to_str())
1430 .unwrap_or(&cmd);
1431 if !self.policy.allow_proc.iter().any(|a| a == basename) {
1432 return Ok(err(Value::Str(format!(
1433 "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1434 self.policy.allow_proc
1435 ))));
1436 }
1437 }
1438
1439 if str_args.len() > 1024 {
1442 return Ok(err(Value::Str(
1443 "proc.spawn: arg-count exceeds 1024".into())));
1444 }
1445 if str_args.iter().any(|a| a.len() > 65_536) {
1446 return Ok(err(Value::Str(
1447 "proc.spawn: per-arg length exceeds 64 KiB".into())));
1448 }
1449
1450 let output = std::process::Command::new(&cmd)
1451 .args(&str_args)
1452 .output();
1453 match output {
1454 Ok(o) => {
1455 let mut rec = indexmap::IndexMap::new();
1456 rec.insert("stdout".into(), Value::Str(
1457 String::from_utf8_lossy(&o.stdout).to_string()));
1458 rec.insert("stderr".into(), Value::Str(
1459 String::from_utf8_lossy(&o.stderr).to_string()));
1460 rec.insert("exit_code".into(), Value::Int(
1461 o.status.code().unwrap_or(-1) as i64));
1462 Ok(ok(Value::Record(rec)))
1463 }
1464 Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1465 }
1466 }
1467 other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1468 }
1469 }
1470
1471 fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1494 let mut fresh = DefaultHandler::new(self.policy.clone());
1495 fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1498 fresh.budget_ceiling = self.budget_ceiling;
1499 fresh.read_root = self.read_root.clone();
1500 fresh.program = self.program.clone();
1501 fresh.chat_registry = self.chat_registry.clone();
1502 fresh.streams = std::sync::Arc::clone(&self.streams);
1507 fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1508 Some(Box::new(fresh))
1509 }
1510}
1511
1512pub struct TlsConfig {
1522 pub cert: Vec<u8>,
1523 pub key: Vec<u8>,
1524}
1525
1526fn serve_http(
1527 port: u16,
1528 handler_name: String,
1529 program: Arc<Program>,
1530 policy: Policy,
1531 tls: Option<TlsConfig>,
1532) -> Result<Value, String> {
1533 let (server, scheme) = match tls {
1534 None => (
1535 tiny_http::Server::http(("127.0.0.1", port))
1536 .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1537 "http",
1538 ),
1539 Some(cfg) => {
1540 let ssl = tiny_http::SslConfig {
1541 certificate: cfg.cert,
1542 private_key: cfg.key,
1543 };
1544 (
1545 tiny_http::Server::https(("127.0.0.1", port), ssl)
1546 .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1547 "https",
1548 )
1549 }
1550 };
1551 eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1552 for req in server.incoming_requests() {
1558 let program = Arc::clone(&program);
1559 let policy = policy.clone();
1560 let handler_name = handler_name.clone();
1561 std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1562 }
1563 Ok(Value::Unit)
1564}
1565
1566fn handle_request(
1567 mut req: tiny_http::Request,
1568 program: Arc<Program>,
1569 policy: Policy,
1570 handler_name: String,
1571) {
1572 let lex_req = build_request_value(&mut req);
1573 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1574 let mut vm = Vm::with_handler(&program, Box::new(handler));
1575 match vm.call(&handler_name, vec![lex_req]) {
1576 Ok(resp) => {
1577 let (status, body, headers) = unpack_response(&resp);
1578 respond_with_body(req, status, body, headers);
1579 }
1580 Err(e) => {
1581 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1582 .with_status_code(500);
1583 let _ = req.respond(response);
1584 }
1585 }
1586}
1587
1588fn serve_http_fn(
1589 port: u16,
1590 closure: Value,
1591 program: Arc<Program>,
1592 policy: Policy,
1593) -> Result<Value, String> {
1594 let server = tiny_http::Server::http(("127.0.0.1", port))
1595 .map_err(|e| format!("net.serve_fn bind {port}: {e}"))?;
1596 eprintln!("net.serve_fn: listening on http://127.0.0.1:{port}");
1597 for req in server.incoming_requests() {
1598 let program = Arc::clone(&program);
1599 let policy = policy.clone();
1600 let closure = closure.clone();
1601 std::thread::spawn(move || handle_request_fn(req, program, policy, closure));
1602 }
1603 Ok(Value::Unit)
1604}
1605
1606fn handle_request_fn(
1607 mut req: tiny_http::Request,
1608 program: Arc<Program>,
1609 policy: Policy,
1610 closure: Value,
1611) {
1612 let lex_req = build_request_value(&mut req);
1613 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1614 let mut vm = Vm::with_handler(&program, Box::new(handler));
1615 match vm.invoke_closure_value(closure, vec![lex_req]) {
1616 Ok(resp) => {
1617 let (status, body, headers) = unpack_response(&resp);
1618 respond_with_body(req, status, body, headers);
1619 }
1620 Err(e) => {
1621 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1622 .with_status_code(500);
1623 let _ = req.respond(response);
1624 }
1625 }
1626}
1627
1628fn build_request_value(req: &mut tiny_http::Request) -> Value {
1629 let method = format!("{:?}", req.method()).to_uppercase();
1630 let url = req.url().to_string();
1631 let (path, query) = match url.split_once('?') {
1632 Some((p, q)) => (p.to_string(), q.to_string()),
1633 None => (url, String::new()),
1634 };
1635 let mut headers_map = std::collections::BTreeMap::new();
1636 for h in req.headers() {
1637 headers_map.insert(
1638 lex_bytecode::MapKey::Str(h.field.as_str().as_str().to_ascii_lowercase()),
1639 Value::Str(h.value.as_str().to_string()),
1640 );
1641 }
1642 let mut body = String::new();
1643 let _ = req.as_reader().read_to_string(&mut body);
1644 let mut rec = indexmap::IndexMap::new();
1645 rec.insert("method".into(), Value::Str(method));
1646 rec.insert("path".into(), Value::Str(path));
1647 rec.insert("query".into(), Value::Str(query));
1648 rec.insert("body".into(), Value::Str(body));
1649 rec.insert("headers".into(), Value::Map(headers_map));
1650 Value::Record(rec)
1651}
1652
1653fn unpack_response(v: &Value) -> (u16, ResponseBodyOut, Vec<tiny_http::Header>) {
1654 if let Value::Record(rec) = v {
1655 let status = rec.get("status").and_then(|s| match s {
1656 Value::Int(n) => Some(*n as u16),
1657 _ => None,
1658 }).unwrap_or(200);
1659 let body = match rec.get("body") {
1660 Some(Value::Variant { name, args }) => match (name.as_str(), args.as_slice()) {
1662 ("BodyStr", [Value::Str(s)]) => ResponseBodyOut::Str(s.clone()),
1663 ("BodyStream", [iter_v]) => ResponseBodyOut::TextChunks(drain_iter_str(iter_v)),
1664 ("BodyBytes", [iter_v]) => ResponseBodyOut::BytesChunks(drain_iter_bytes(iter_v)),
1665 _ => ResponseBodyOut::Str(String::new()),
1666 },
1667 Some(Value::Str(s)) => ResponseBodyOut::Str(s.clone()),
1673 _ => ResponseBodyOut::Str(String::new()),
1674 };
1675 let headers = if let Some(Value::Map(hmap)) = rec.get("headers") {
1676 hmap.iter().filter_map(|(k, v)| {
1677 if let (lex_bytecode::MapKey::Str(name), Value::Str(val)) = (k, v) {
1678 format!("{name}: {val}").parse::<tiny_http::Header>().ok()
1679 } else {
1680 None
1681 }
1682 }).collect()
1683 } else {
1684 vec![]
1685 };
1686 return (status, body, headers);
1687 }
1688 (
1689 500,
1690 ResponseBodyOut::Str(format!("handler returned non-record: {v:?}")),
1691 vec![],
1692 )
1693}
1694
1695fn respond_with_body(
1707 req: tiny_http::Request,
1708 status: u16,
1709 body: ResponseBodyOut,
1710 headers: Vec<tiny_http::Header>,
1711) {
1712 match body {
1713 ResponseBodyOut::Str(s) => {
1714 let mut response = tiny_http::Response::from_string(s).with_status_code(status);
1715 for h in headers {
1716 response.add_header(h);
1717 }
1718 let _ = req.respond(response);
1719 }
1720 ResponseBodyOut::TextChunks(chunks) | ResponseBodyOut::BytesChunks(chunks) => {
1721 let reader = ChunkReader::new(chunks);
1722 let response = tiny_http::Response::new(
1723 tiny_http::StatusCode(status),
1724 headers,
1725 reader,
1726 None, None,
1728 );
1729 let _ = req.respond(response);
1730 }
1731 }
1732}
1733
1734enum ResponseBodyOut {
1739 Str(String),
1740 TextChunks(Vec<Vec<u8>>),
1744 BytesChunks(Vec<Vec<u8>>),
1747}
1748
1749fn drain_iter_str(v: &Value) -> Vec<Vec<u8>> {
1762 match v {
1763 Value::Variant { name, args }
1764 if name == "__IterEager" && args.len() == 2 =>
1765 {
1766 if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
1767 items.iter().skip(*idx as usize).filter_map(|item| {
1768 if let Value::Str(s) = item { Some(s.as_bytes().to_vec()) } else { None }
1769 }).collect()
1770 } else {
1771 Vec::new()
1772 }
1773 }
1774 _ => Vec::new(),
1775 }
1776}
1777
1778fn drain_iter_bytes(v: &Value) -> Vec<Vec<u8>> {
1782 match v {
1783 Value::Variant { name, args }
1784 if name == "__IterEager" && args.len() == 2 =>
1785 {
1786 if let (Value::List(items), Value::Int(idx)) = (&args[0], &args[1]) {
1787 items.iter().skip(*idx as usize).filter_map(|item| {
1788 if let Value::List(ints) = item {
1789 Some(ints.iter().filter_map(|i| match i {
1790 Value::Int(n) => Some((*n & 0xff) as u8),
1791 _ => None,
1792 }).collect::<Vec<u8>>())
1793 } else {
1794 None
1795 }
1796 }).collect()
1797 } else {
1798 Vec::new()
1799 }
1800 }
1801 _ => Vec::new(),
1802 }
1803}
1804
1805struct ChunkReader {
1811 chunks: std::collections::VecDeque<Vec<u8>>,
1812 cursor: usize,
1813}
1814
1815impl ChunkReader {
1816 fn new(chunks: Vec<Vec<u8>>) -> Self {
1817 Self {
1818 chunks: chunks.into_iter().filter(|c| !c.is_empty()).collect(),
1819 cursor: 0,
1820 }
1821 }
1822}
1823
1824impl std::io::Read for ChunkReader {
1825 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1826 loop {
1827 let Some(front) = self.chunks.front() else {
1828 return Ok(0);
1829 };
1830 let remaining = &front[self.cursor..];
1831 if remaining.is_empty() {
1832 self.chunks.pop_front();
1833 self.cursor = 0;
1834 continue;
1835 }
1836 let n = remaining.len().min(buf.len());
1837 buf[..n].copy_from_slice(&remaining[..n]);
1838 self.cursor += n;
1839 if self.cursor >= front.len() {
1840 self.chunks.pop_front();
1841 self.cursor = 0;
1842 }
1843 return Ok(n);
1844 }
1845 }
1846}
1847
1848fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1854 use std::time::Duration;
1855 let agent: ureq::Agent = ureq::Agent::config_builder()
1860 .timeout_connect(Some(Duration::from_secs(10)))
1861 .timeout_recv_body(Some(Duration::from_secs(30)))
1862 .timeout_send_body(Some(Duration::from_secs(10)))
1863 .http_status_as_error(false)
1864 .build()
1865 .into();
1866 let resp = match (method, body) {
1867 ("GET", _) => agent.get(url).call(),
1868 ("POST", Some(b)) => agent.post(url).send(b),
1869 ("POST", None) => agent.post(url).send(""),
1870 (m, _) => return err_value(format!("unsupported method: {m}")),
1871 };
1872 match resp {
1873 Ok(mut r) => {
1874 let status = r.status().as_u16();
1875 let body = r.body_mut().read_to_string().unwrap_or_default();
1876 if (200..300).contains(&status) {
1877 Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1878 } else {
1879 err_value(format!("status {status}: {body}"))
1880 }
1881 }
1882 Err(e) => err_value(format!("transport: {e}")),
1883 }
1884}
1885
1886fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1891 use std::time::Duration;
1892 let mut b = ureq::Agent::config_builder()
1893 .timeout_connect(Some(Duration::from_secs(10)))
1894 .timeout_recv_body(Some(Duration::from_secs(30)))
1895 .timeout_send_body(Some(Duration::from_secs(10)))
1896 .http_status_as_error(false);
1897 if let Some(ms) = timeout_ms {
1898 let d = Duration::from_millis(ms);
1899 b = b.timeout_global(Some(d));
1900 }
1901 b.build().into()
1902}
1903
1904fn http_error_value(e: ureq::Error) -> Value {
1908 let (ctor, payload): (&str, Option<String>) = match &e {
1909 ureq::Error::Timeout(_) => ("TimeoutError", None),
1910 ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1911 ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1912 ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1913 _ => ("NetworkError", Some(format!("{e}"))),
1914 };
1915 let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1916 let inner = Value::Variant { name: ctor.into(), args };
1917 Value::Variant { name: "Err".into(), args: vec![inner] }
1918}
1919
1920fn http_decode_err(msg: String) -> Value {
1921 let inner = Value::Variant {
1922 name: "DecodeError".into(),
1923 args: vec![Value::Str(msg)],
1924 };
1925 Value::Variant { name: "Err".into(), args: vec![inner] }
1926}
1927
1928fn http_send_simple(
1933 method: &str,
1934 url: &str,
1935 body: Option<Vec<u8>>,
1936 content_type: &str,
1937 timeout_ms: Option<u64>,
1938) -> Value {
1939 http_send_full(method, url, body, content_type, &[], timeout_ms)
1940}
1941
1942fn http_send_full(
1943 method: &str,
1944 url: &str,
1945 body: Option<Vec<u8>>,
1946 content_type: &str,
1947 headers: &[(String, String)],
1948 timeout_ms: Option<u64>,
1949) -> Value {
1950 let agent = http_agent(timeout_ms);
1951 let resp = match method {
1952 "GET" => {
1953 let mut req = agent.get(url);
1954 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1955 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1956 req.call()
1957 }
1958 "POST" => {
1959 let body = body.unwrap_or_default();
1960 let mut req = agent.post(url);
1961 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1962 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1963 req.send(&body[..])
1964 }
1965 m => {
1966 return http_decode_err(format!("unsupported method: {m}"));
1970 }
1971 };
1972 match resp {
1973 Ok(mut r) => {
1974 let status = r.status().as_u16() as i64;
1975 let headers_map = collect_response_headers(r.headers());
1976 let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1977 Ok(b) => b,
1978 Err(e) => return http_decode_err(format!("body read: {e}")),
1979 };
1980 let mut rec = indexmap::IndexMap::new();
1981 rec.insert("status".into(), Value::Int(status));
1982 rec.insert("headers".into(), Value::Map(headers_map));
1983 rec.insert("body".into(), Value::Bytes(body_bytes));
1984 Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1985 }
1986 Err(e) => http_error_value(e),
1987 }
1988}
1989
1990fn collect_response_headers(
1991 headers: &ureq::http::HeaderMap,
1992) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1993 let mut out = std::collections::BTreeMap::new();
1994 for (name, value) in headers.iter() {
1995 let v = value.to_str().unwrap_or("").to_string();
1996 out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1997 }
1998 out
1999}
2000
2001fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
2005 let method = match req.get("method") {
2006 Some(Value::Str(s)) => s.clone(),
2007 _ => return http_decode_err("HttpRequest.method must be Str".into()),
2008 };
2009 let url = match req.get("url") {
2010 Some(Value::Str(s)) => s.clone(),
2011 _ => return http_decode_err("HttpRequest.url must be Str".into()),
2012 };
2013 if let Err(e) = handler.ensure_host_allowed(&url) {
2014 return http_decode_err(e);
2015 }
2016 let body = match req.get("body") {
2017 Some(Value::Variant { name, args }) if name == "None" => None,
2018 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2019 [Value::Bytes(b)] => Some(b.clone()),
2020 _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
2021 },
2022 _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
2023 };
2024 let timeout_ms = match req.get("timeout_ms") {
2025 Some(Value::Variant { name, .. }) if name == "None" => None,
2026 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
2027 [Value::Int(n)] if *n >= 0 => Some(*n as u64),
2028 _ => return http_decode_err(
2029 "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
2030 },
2031 _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
2032 };
2033 let headers: Vec<(String, String)> = match req.get("headers") {
2034 Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
2035 let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
2036 let vv = match v { Value::Str(s) => s.clone(), _ => return None };
2037 Some((kk, vv))
2038 }).collect(),
2039 _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
2040 };
2041 http_send_full(&method, &url, body, "", &headers, timeout_ms)
2042}
2043
2044fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
2045 match v {
2046 Some(Value::Record(r)) => Ok(r),
2047 Some(other) => Err(format!("expected Record, got {other:?}")),
2048 None => Err("missing Record argument".into()),
2049 }
2050}
2051
2052fn err_value(msg: String) -> Value {
2053 Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
2054}
2055
2056fn expect_str(v: Option<&Value>) -> Result<&str, String> {
2057 match v {
2058 Some(Value::Str(s)) => Ok(s),
2059 Some(other) => Err(format!("expected Str arg, got {other:?}")),
2060 None => Err("missing argument".into()),
2061 }
2062}
2063
2064fn expect_int(v: Option<&Value>) -> Result<i64, String> {
2065 match v {
2066 Some(Value::Int(n)) => Ok(*n),
2067 Some(other) => Err(format!("expected Int arg, got {other:?}")),
2068 None => Err("missing argument".into()),
2069 }
2070}
2071
2072fn ok(v: Value) -> Value {
2073 Value::Variant { name: "Ok".into(), args: vec![v] }
2074}
2075fn err(v: Value) -> Value {
2076 Value::Variant { name: "Err".into(), args: vec![v] }
2077}
2078
2079fn sql_error(message: impl Into<String>, code: Option<String>, detail: Option<String>) -> Value {
2083 let some = |s: String| Value::Variant { name: "Some".into(), args: vec![Value::Str(s)] };
2084 let none = || Value::Variant { name: "None".into(), args: vec![] };
2085 let mut rec = indexmap::IndexMap::new();
2086 rec.insert("message".into(), Value::Str(message.into()));
2087 rec.insert("code".into(), match code {
2088 Some(c) => some(c),
2089 None => none(),
2090 });
2091 rec.insert("detail".into(), match detail {
2092 Some(d) => some(d),
2093 None => none(),
2094 });
2095 Value::Record(rec)
2096}
2097
2098fn sqlite_err_to_sql_error(e: rusqlite::Error, op: &str) -> Value {
2108 let message = format!("{op}: {e}");
2109 match &e {
2110 rusqlite::Error::SqliteFailure(ffi, detail_opt) => {
2111 sql_error(
2112 message,
2113 Some(sqlite_extended_code_name(ffi.extended_code)),
2114 detail_opt.clone(),
2115 )
2116 }
2117 rusqlite::Error::SqlInputError { error, msg, .. } => {
2118 sql_error(
2119 message,
2120 Some(sqlite_extended_code_name(error.extended_code)),
2121 Some(msg.clone()),
2122 )
2123 }
2124 _ => sql_error(message, None, None),
2125 }
2126}
2127
2128fn sqlite_extended_code_name(code: i32) -> String {
2134 use rusqlite::ffi::*;
2135 let s = match code {
2136 SQLITE_BUSY => "SQLITE_BUSY",
2137 SQLITE_LOCKED => "SQLITE_LOCKED",
2138 SQLITE_READONLY => "SQLITE_READONLY",
2139 SQLITE_IOERR => "SQLITE_IOERR",
2140 SQLITE_CORRUPT => "SQLITE_CORRUPT",
2141 SQLITE_NOTFOUND => "SQLITE_NOTFOUND",
2142 SQLITE_FULL => "SQLITE_FULL",
2143 SQLITE_CANTOPEN => "SQLITE_CANTOPEN",
2144 SQLITE_PROTOCOL => "SQLITE_PROTOCOL",
2145 SQLITE_SCHEMA => "SQLITE_SCHEMA",
2146 SQLITE_TOOBIG => "SQLITE_TOOBIG",
2147 SQLITE_CONSTRAINT => "SQLITE_CONSTRAINT",
2148 SQLITE_CONSTRAINT_CHECK => "SQLITE_CONSTRAINT_CHECK",
2149 SQLITE_CONSTRAINT_FOREIGNKEY => "SQLITE_CONSTRAINT_FOREIGNKEY",
2150 SQLITE_CONSTRAINT_NOTNULL => "SQLITE_CONSTRAINT_NOTNULL",
2151 SQLITE_CONSTRAINT_PRIMARYKEY => "SQLITE_CONSTRAINT_PRIMARYKEY",
2152 SQLITE_CONSTRAINT_TRIGGER => "SQLITE_CONSTRAINT_TRIGGER",
2153 SQLITE_CONSTRAINT_UNIQUE => "SQLITE_CONSTRAINT_UNIQUE",
2154 SQLITE_CONSTRAINT_VTAB => "SQLITE_CONSTRAINT_VTAB",
2155 SQLITE_CONSTRAINT_ROWID => "SQLITE_CONSTRAINT_ROWID",
2156 SQLITE_MISMATCH => "SQLITE_MISMATCH",
2157 SQLITE_RANGE => "SQLITE_RANGE",
2158 SQLITE_NOTADB => "SQLITE_NOTADB",
2159 SQLITE_AUTH => "SQLITE_AUTH",
2160 _ => return format!("SQLITE_ERROR_{code}"),
2161 };
2162 s.to_string()
2163}
2164
2165fn pg_err_to_sql_error(e: postgres::Error, op: &str) -> Value {
2169 let message = format!("{op}: {e}");
2170 let code = e.as_db_error().map(|db| db.code().code().to_string());
2171 let detail = e.as_db_error().and_then(|db| db.detail().map(|s| s.to_string()));
2172 sql_error(message, code, detail)
2173}
2174
2175impl DefaultHandler {
2176 fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
2182 let server = match args.first() {
2183 Some(Value::Str(s)) => s.clone(),
2184 _ => return err(Value::Str(
2185 "agent.call_mcp(server, tool, args_json): server must be Str".into())),
2186 };
2187 let tool = match args.get(1) {
2188 Some(Value::Str(s)) => s.clone(),
2189 _ => return err(Value::Str(
2190 "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
2191 };
2192 let args_json = match args.get(2) {
2193 Some(Value::Str(s)) => s.clone(),
2194 _ => return err(Value::Str(
2195 "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
2196 };
2197 let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
2198 Ok(v) => v,
2199 Err(e) => return err(Value::Str(format!(
2200 "agent.call_mcp: args_json is not valid JSON: {e}"))),
2201 };
2202 match self.mcp_clients.call(&server, &tool, parsed) {
2203 Ok(result) => ok(Value::Str(
2204 serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
2205 Err(e) => err(Value::Str(e)),
2206 }
2207 }
2208
2209 fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
2215 let _prompt = match args.first() {
2216 Some(Value::Str(s)) => s.clone(),
2217 _ => return err(Value::Str(
2218 "agent.cloud_stream(prompt): prompt must be Str".into())),
2219 };
2220 let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
2221 Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
2222 Err(_) => return err(Value::Str(
2223 "agent.cloud_stream: live streaming not yet implemented; \
2224 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
2225 };
2226 let handle = self.register_stream(chunks.into_iter());
2227 ok(stream_handle_value(handle))
2228 }
2229
2230 fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
2236 let handle = match args.first().and_then(stream_handle_id) {
2237 Some(h) => h,
2238 None => return Value::Variant { name: "None".into(), args: vec![] },
2239 };
2240 let mut streams = match self.streams.lock() {
2241 Ok(g) => g,
2242 Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
2243 };
2244 match streams.get_mut(&handle).and_then(|it| it.next()) {
2245 Some(chunk) => some(Value::Str(chunk)),
2246 None => {
2247 streams.remove(&handle);
2248 Value::Variant { name: "None".into(), args: vec![] }
2249 }
2250 }
2251 }
2252
2253 fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
2258 let handle = match args.first().and_then(stream_handle_id) {
2259 Some(h) => h,
2260 None => return Value::List(Vec::new()),
2261 };
2262 let mut iter = {
2263 let mut streams = match self.streams.lock() {
2264 Ok(g) => g,
2265 Err(_) => return Value::List(Vec::new()),
2266 };
2267 match streams.remove(&handle) {
2268 Some(it) => it,
2269 None => return Value::List(Vec::new()),
2270 }
2271 };
2272 let mut out: Vec<Value> = Vec::new();
2273 for chunk in iter.by_ref() {
2274 out.push(Value::Str(chunk));
2275 }
2276 Value::List(out)
2277 }
2278
2279 fn register_stream<I>(&self, iter: I) -> String
2283 where
2284 I: Iterator<Item = String> + Send + 'static,
2285 {
2286 let id = self
2287 .next_stream_id
2288 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2289 let handle = format!("stream_{id}");
2290 if let Ok(mut streams) = self.streams.lock() {
2291 streams.insert(handle.clone(), Box::new(iter));
2292 }
2293 handle
2294 }
2295}
2296
2297fn stream_handle_value(handle: String) -> Value {
2302 Value::Variant {
2303 name: "__StreamHandle".into(),
2304 args: vec![Value::Str(handle)],
2305 }
2306}
2307
2308fn stream_handle_id(v: &Value) -> Option<String> {
2312 match v {
2313 Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
2314 Some(Value::Str(h)) => Some(h.clone()),
2315 _ => None,
2316 },
2317 _ => None,
2318 }
2319}
2320
2321fn dispatch_llm_local(args: Vec<Value>) -> Value {
2326 let prompt = match args.first() {
2327 Some(Value::Str(s)) => s.clone(),
2328 _ => return err(Value::Str(
2329 "agent.local_complete(prompt): prompt must be Str".into())),
2330 };
2331 match crate::llm::local_complete(&prompt) {
2332 Ok(text) => ok(Value::Str(text)),
2333 Err(e) => err(Value::Str(e)),
2334 }
2335}
2336
2337fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
2344 let prompt = match args.first() {
2345 Some(Value::Str(s)) => s.clone(),
2346 _ => return err(Value::Str(
2347 "agent.cloud_complete(prompt): prompt must be Str".into())),
2348 };
2349 match crate::llm::cloud_complete(&prompt) {
2350 Ok(text) => ok(Value::Str(text)),
2351 Err(e) => err(Value::Str(e)),
2352 }
2353}
2354
2355fn some(v: Value) -> Value {
2356 Value::Variant { name: "Some".into(), args: vec![v] }
2357}
2358fn none() -> Value {
2359 Value::Variant { name: "None".into(), args: vec![] }
2360}
2361
2362fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
2363 match v {
2364 Some(Value::Bytes(b)) => Ok(b),
2365 Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
2366 None => Err("missing argument".into()),
2367 }
2368}
2369
2370fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
2371 match v {
2372 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2373 Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
2374 None => Err("missing Kv argument".into()),
2375 }
2376}
2377
2378fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
2379 match v {
2380 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2381 Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
2382 None => Err("missing Db argument".into()),
2383 }
2384}
2385
2386#[allow(dead_code)]
2387fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
2388 match v {
2389 Some(Value::List(items)) => items.iter().map(|x| match x {
2390 Value::Str(s) => Ok(s.clone()),
2391 other => Err(format!("expected List[Str] element, got {other:?}")),
2392 }).collect(),
2393 Some(other) => Err(format!("expected List[Str], got {other:?}")),
2394 None => Err("missing List[Str] argument".into()),
2395 }
2396}
2397
2398fn expect_sql_params(v: Option<&Value>) -> Result<Vec<SqlParamValue>, String> {
2401 let items = match v {
2402 Some(Value::List(xs)) => xs,
2403 Some(other) => return Err(format!("expected List[SqlParam], got {other:?}")),
2404 None => return Err("missing params argument".into()),
2405 };
2406 items.iter().map(|item| {
2407 match item {
2408 Value::Variant { name, args } => match name.as_str() {
2409 "PStr" => match args.first() {
2410 Some(Value::Str(s)) => Ok(SqlParamValue::Text(s.clone())),
2411 _ => Err("PStr requires a Str argument".into()),
2412 },
2413 "PInt" => match args.first() {
2414 Some(Value::Int(n)) => Ok(SqlParamValue::Integer(*n)),
2415 _ => Err("PInt requires an Int argument".into()),
2416 },
2417 "PFloat" => match args.first() {
2418 Some(Value::Float(f)) => Ok(SqlParamValue::Real(*f)),
2419 _ => Err("PFloat requires a Float argument".into()),
2420 },
2421 "PBool" => match args.first() {
2422 Some(Value::Bool(b)) => Ok(SqlParamValue::Bool(*b)),
2423 _ => Err("PBool requires a Bool argument".into()),
2424 },
2425 "PNull" => Ok(SqlParamValue::Null),
2426 other => Err(format!("unknown SqlParam constructor `{other}`")),
2427 },
2428 Value::Str(s) => Ok(SqlParamValue::Text(s.clone())),
2430 other => Err(format!("expected SqlParam variant, got {other:?}")),
2431 }
2432 }).collect()
2433}
2434
2435fn sqlite_params(params: &[SqlParamValue]) -> Vec<rusqlite::types::Value> {
2437 params.iter().map(|p| match p {
2438 SqlParamValue::Text(s) => rusqlite::types::Value::Text(s.clone()),
2439 SqlParamValue::Integer(n) => rusqlite::types::Value::Integer(*n),
2440 SqlParamValue::Real(f) => rusqlite::types::Value::Real(*f),
2441 SqlParamValue::Bool(b) => rusqlite::types::Value::Integer(*b as i64),
2442 SqlParamValue::Null => rusqlite::types::Value::Null,
2443 }).collect()
2444}
2445
2446fn pg_param_refs(params: &[SqlParamValue]) -> Vec<Box<dyn postgres::types::ToSql + Sync>> {
2448 params.iter().map(|p| -> Box<dyn postgres::types::ToSql + Sync> {
2449 match p {
2450 SqlParamValue::Text(s) => Box::new(s.clone()),
2451 SqlParamValue::Integer(n) => Box::new(*n),
2452 SqlParamValue::Real(f) => Box::new(*f),
2453 SqlParamValue::Bool(b) => Box::new(*b),
2454 SqlParamValue::Null => Box::new(Option::<String>::None),
2455 }
2456 }).collect()
2457}
2458
2459fn sql_run_query_sqlite(
2461 conn: &rusqlite::Connection,
2462 stmt_str: &str,
2463 params: &[SqlParamValue],
2464) -> Value {
2465 let mut stmt = match conn.prepare(stmt_str) {
2466 Ok(s) => s,
2467 Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2468 };
2469 let column_count = stmt.column_count();
2470 let column_names: Vec<String> = (0..column_count)
2471 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
2472 .collect();
2473 let bound = sqlite_params(params);
2474 let bind: Vec<&dyn rusqlite::ToSql> = bound.iter()
2475 .map(|p| p as &dyn rusqlite::ToSql)
2476 .collect();
2477 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
2478 Ok(r) => r,
2479 Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2480 };
2481 let mut out: Vec<Value> = Vec::new();
2482 loop {
2483 let row = match rows.next() {
2484 Ok(Some(r)) => r,
2485 Ok(None) => break,
2486 Err(e) => return err(sqlite_err_to_sql_error(e, "sql.query")),
2487 };
2488 let mut rec = indexmap::IndexMap::new();
2489 for (i, name) in column_names.iter().enumerate() {
2490 let cell = match row.get_ref(i) {
2491 Ok(c) => sql_value_ref_to_lex(c),
2492 Err(e) => return err(sqlite_err_to_sql_error(e, &format!("sql.query: column {i}"))),
2493 };
2494 rec.insert(name.clone(), cell);
2495 }
2496 out.push(Value::Record(rec));
2497 }
2498 ok(Value::List(out))
2499}
2500
2501fn sql_run_query_pg(
2503 client: &mut postgres::Client,
2504 stmt_str: &str,
2505 params: &[SqlParamValue],
2506) -> Value {
2507 let pg = pg_param_refs(params);
2508 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
2509 pg.iter().map(|b| b.as_ref()).collect();
2510 let rows = match client.query(stmt_str, &refs) {
2511 Ok(r) => r,
2512 Err(e) => return err(pg_err_to_sql_error(e, "sql.query")),
2513 };
2514 let out: Vec<Value> = rows.iter().map(|row| {
2515 Value::Record(pg_row_to_lex_record(row))
2516 }).collect();
2517 ok(Value::List(out))
2518}
2519
2520fn pg_row_to_lex_record(row: &postgres::Row) -> indexmap::IndexMap<String, Value> {
2522 use postgres::types::Type;
2523 let mut rec = indexmap::IndexMap::new();
2524 for (i, col) in row.columns().iter().enumerate() {
2525 let ty = col.type_();
2526 let val = if *ty == Type::INT2 || *ty == Type::INT4 || *ty == Type::INT8 {
2527 row.get::<_, Option<i64>>(i).map(Value::Int).unwrap_or(Value::Unit)
2528 } else if *ty == Type::FLOAT4 || *ty == Type::FLOAT8 {
2529 row.get::<_, Option<f64>>(i).map(Value::Float).unwrap_or(Value::Unit)
2530 } else if *ty == Type::BOOL {
2531 row.get::<_, Option<bool>>(i).map(Value::Bool).unwrap_or(Value::Unit)
2532 } else if *ty == Type::BYTEA {
2533 row.get::<_, Option<Vec<u8>>>(i).map(Value::Bytes).unwrap_or(Value::Unit)
2534 } else {
2535 row.get::<_, Option<String>>(i).map(Value::Str).unwrap_or(Value::Unit)
2536 };
2537 rec.insert(col.name().to_string(), val);
2538 }
2539 rec
2540}
2541
2542fn sql_get_col<F>(args: &[Value], convert: F) -> Result<Value, String>
2544where
2545 F: Fn(&Value) -> Option<Value>,
2546{
2547 let row = args.first().ok_or("sql.get_*: missing row argument")?;
2548 let col = match args.get(1) {
2549 Some(Value::Str(s)) => s.as_str(),
2550 Some(other) => return Err(format!("sql.get_*: column name must be Str, got {other:?}")),
2551 None => return Err("sql.get_*: missing column name argument".into()),
2552 };
2553 let cell = match row {
2554 Value::Record(rec) => rec.get(col).cloned(),
2555 other => return Err(format!("sql.get_*: row must be a Record, got {other:?}")),
2556 };
2557 Ok(match cell.and_then(|v| convert(&v)) {
2558 Some(v) => Value::Variant { name: "Some".into(), args: vec![v] },
2559 None => Value::Variant { name: "None".into(), args: vec![] },
2560 })
2561}
2562
2563fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
2564 use rusqlite::types::ValueRef;
2565 match v {
2566 ValueRef::Null => Value::Unit,
2567 ValueRef::Integer(n) => Value::Int(n),
2568 ValueRef::Real(f) => Value::Float(f),
2569 ValueRef::Text(s) => Value::Str(String::from_utf8_lossy(s).into_owned()),
2570 ValueRef::Blob(b) => Value::Bytes(b.to_vec()),
2571 }
2572}
2573
2574#[derive(Clone, Copy, PartialEq, PartialOrd)]
2577enum LogLevel { Debug, Info, Warn, Error }
2578
2579#[derive(Clone, Copy, PartialEq)]
2580enum LogFormat { Text, Json }
2581
2582#[derive(Clone)]
2583enum LogSink {
2584 Stderr,
2585 File(std::sync::Arc<Mutex<std::fs::File>>),
2586}
2587
2588struct LogState {
2589 level: LogLevel,
2590 format: LogFormat,
2591 sink: LogSink,
2592}
2593
2594fn log_state() -> &'static Mutex<LogState> {
2595 static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
2596 STATE.get_or_init(|| Mutex::new(LogState {
2597 level: LogLevel::Info,
2598 format: LogFormat::Text,
2599 sink: LogSink::Stderr,
2600 }))
2601}
2602
2603fn parse_log_level(s: &str) -> Option<LogLevel> {
2604 match s {
2605 "debug" => Some(LogLevel::Debug),
2606 "info" => Some(LogLevel::Info),
2607 "warn" => Some(LogLevel::Warn),
2608 "error" => Some(LogLevel::Error),
2609 _ => None,
2610 }
2611}
2612
2613fn level_label(l: LogLevel) -> &'static str {
2614 match l {
2615 LogLevel::Debug => "debug",
2616 LogLevel::Info => "info",
2617 LogLevel::Warn => "warn",
2618 LogLevel::Error => "error",
2619 }
2620}
2621
2622fn emit_log(level: LogLevel, msg: &str) {
2623 let state = log_state().lock().unwrap();
2624 if level < state.level {
2625 return;
2626 }
2627 let ts = chrono::Utc::now().to_rfc3339();
2628 let line = match state.format {
2629 LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
2630 LogFormat::Json => {
2631 let escaped = msg
2635 .replace('\\', "\\\\")
2636 .replace('"', "\\\"")
2637 .replace('\n', "\\n")
2638 .replace('\r', "\\r");
2639 format!(
2640 "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
2641 level_label(level),
2642 )
2643 }
2644 };
2645 let sink = state.sink.clone();
2646 drop(state);
2647 match sink {
2648 LogSink::Stderr => {
2649 use std::io::Write;
2650 let _ = std::io::stderr().write_all(line.as_bytes());
2651 }
2652 LogSink::File(f) => {
2653 use std::io::Write;
2654 if let Ok(mut g) = f.lock() {
2655 let _ = g.write_all(line.as_bytes());
2656 }
2657 }
2658 }
2659}
2660
2661pub(crate) struct ProcessState {
2662 child: std::process::Child,
2663 stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
2664 stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
2665}
2666
2667fn process_registry() -> &'static Mutex<ProcessRegistry> {
2681 static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
2682 REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
2683}
2684
2685const MAX_PROCESS_HANDLES: usize = 256;
2686
2687type SharedProcessState = Arc<Mutex<ProcessState>>;
2688
2689pub(crate) struct ProcessRegistry {
2690 entries: indexmap::IndexMap<u64, SharedProcessState>,
2691 cap: usize,
2692}
2693
2694impl ProcessRegistry {
2695 pub(crate) fn with_capacity(cap: usize) -> Self {
2696 Self { entries: indexmap::IndexMap::new(), cap }
2697 }
2698
2699 pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
2703 if self.entries.len() >= self.cap {
2704 self.entries.shift_remove_index(0);
2705 }
2706 self.entries.insert(handle, Arc::new(Mutex::new(state)));
2707 }
2708
2709 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
2713 let idx = self.entries.get_index_of(&handle)?;
2714 self.entries.move_index(idx, self.entries.len() - 1);
2715 self.entries.get(&handle).cloned()
2716 }
2717
2718 pub(crate) fn remove(&mut self, handle: u64) {
2723 self.entries.shift_remove(&handle);
2724 }
2725
2726 #[cfg(test)]
2727 pub(crate) fn len(&self) -> usize { self.entries.len() }
2728}
2729
2730fn next_process_handle() -> u64 {
2731 static COUNTER: AtomicU64 = AtomicU64::new(1);
2732 COUNTER.fetch_add(1, Ordering::SeqCst)
2733}
2734
2735#[cfg(all(test, unix))]
2736mod process_registry_tests {
2737 use super::{ProcessRegistry, ProcessState};
2738
2739 fn fresh_state() -> ProcessState {
2743 let child = std::process::Command::new("true")
2744 .stdout(std::process::Stdio::null())
2745 .stderr(std::process::Stdio::null())
2746 .spawn()
2747 .expect("spawn `true`");
2748 ProcessState { child, stdout: None, stderr: None }
2749 }
2750
2751 #[test]
2752 fn insert_and_get_round_trip() {
2753 let mut r = ProcessRegistry::with_capacity(4);
2754 r.insert(1, fresh_state());
2755 assert!(r.touch_get(1).is_some());
2756 assert!(r.touch_get(2).is_none());
2757 }
2758
2759 #[test]
2760 fn touch_get_returns_distinct_arcs_for_distinct_handles() {
2761 let mut r = ProcessRegistry::with_capacity(4);
2762 r.insert(1, fresh_state());
2763 r.insert(2, fresh_state());
2764 let a = r.touch_get(1).unwrap();
2765 let b = r.touch_get(2).unwrap();
2766 assert!(!std::sync::Arc::ptr_eq(&a, &b));
2768 }
2769
2770 #[test]
2771 fn cap_evicts_lru_on_overflow() {
2772 let mut r = ProcessRegistry::with_capacity(2);
2773 r.insert(1, fresh_state());
2774 r.insert(2, fresh_state());
2775 let _ = r.touch_get(1);
2776 r.insert(3, fresh_state());
2777 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2778 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2779 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2780 assert_eq!(r.len(), 2);
2781 }
2782
2783 #[test]
2784 fn cap_with_no_touches_evicts_in_insertion_order() {
2785 let mut r = ProcessRegistry::with_capacity(2);
2786 r.insert(10, fresh_state());
2787 r.insert(20, fresh_state());
2788 r.insert(30, fresh_state());
2789 assert!(r.touch_get(10).is_none());
2790 assert!(r.touch_get(20).is_some());
2791 assert!(r.touch_get(30).is_some());
2792 }
2793
2794 #[test]
2795 fn remove_drops_entry() {
2796 let mut r = ProcessRegistry::with_capacity(4);
2797 r.insert(1, fresh_state());
2798 r.remove(1);
2799 assert!(r.touch_get(1).is_none());
2800 assert_eq!(r.len(), 0);
2801 }
2802
2803 #[test]
2804 fn many_inserts_stay_bounded_at_cap() {
2805 let cap = 8;
2806 let mut r = ProcessRegistry::with_capacity(cap);
2807 for i in 0..(cap as u64 * 3) {
2808 r.insert(i, fresh_state());
2809 assert!(r.len() <= cap);
2810 }
2811 assert_eq!(r.len(), cap);
2812 }
2813
2814 #[test]
2815 fn outstanding_arc_outlives_remove() {
2816 let mut r = ProcessRegistry::with_capacity(4);
2820 r.insert(1, fresh_state());
2821 let arc = r.touch_get(1).expect("entry exists");
2822 r.remove(1);
2823 assert!(r.touch_get(1).is_none());
2825 let _state = arc.lock().unwrap();
2826 }
2827}
2828
2829fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
2830 match v {
2831 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2832 Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
2833 None => Err("missing ProcessHandle argument".into()),
2834 }
2835}
2836
2837fn kv_registry() -> &'static Mutex<KvRegistry> {
2849 static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
2850 REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
2851}
2852
2853const MAX_KV_HANDLES: usize = 256;
2859
2860pub(crate) struct KvRegistry {
2865 entries: indexmap::IndexMap<u64, sled::Db>,
2866 cap: usize,
2867}
2868
2869impl KvRegistry {
2870 pub(crate) fn with_capacity(cap: usize) -> Self {
2871 Self { entries: indexmap::IndexMap::new(), cap }
2872 }
2873
2874 pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
2877 if self.entries.len() >= self.cap {
2878 self.entries.shift_remove_index(0);
2879 }
2880 self.entries.insert(handle, db);
2881 }
2882
2883 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
2885 let idx = self.entries.get_index_of(&handle)?;
2886 self.entries.move_index(idx, self.entries.len() - 1);
2887 self.entries.get(&handle)
2888 }
2889
2890 pub(crate) fn remove(&mut self, handle: u64) {
2892 self.entries.shift_remove(&handle);
2893 }
2894
2895 #[cfg(test)]
2896 pub(crate) fn len(&self) -> usize { self.entries.len() }
2897}
2898
2899fn next_kv_handle() -> u64 {
2900 static COUNTER: AtomicU64 = AtomicU64::new(1);
2901 COUNTER.fetch_add(1, Ordering::SeqCst)
2902}
2903
2904fn sql_registry() -> &'static Mutex<SqlRegistry> {
2911 static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
2912 REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
2913}
2914
2915const MAX_SQL_HANDLES: usize = 256;
2916
2917const CURSOR_CHANNEL_CAPACITY: usize = 64;
2940const MAX_CURSOR_HANDLES: usize = 256;
2941
2942type CursorReceiver = std::sync::mpsc::Receiver<Result<Value, String>>;
2943
2944pub(crate) struct CursorRegistry {
2945 entries: indexmap::IndexMap<u64, Arc<Mutex<CursorReceiver>>>,
2950 cap: usize,
2951}
2952
2953impl CursorRegistry {
2954 pub(crate) fn with_capacity(cap: usize) -> Self {
2955 Self { entries: indexmap::IndexMap::new(), cap }
2956 }
2957
2958 pub(crate) fn insert(&mut self, handle: u64, rx: CursorReceiver) {
2959 if self.entries.len() >= self.cap {
2960 self.entries.shift_remove_index(0);
2961 }
2962 self.entries.insert(handle, Arc::new(Mutex::new(rx)));
2963 }
2964
2965 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<Arc<Mutex<CursorReceiver>>> {
2966 let idx = self.entries.get_index_of(&handle)?;
2967 self.entries.move_index(idx, self.entries.len() - 1);
2968 self.entries.get(&handle).cloned()
2969 }
2970
2971 pub(crate) fn remove(&mut self, handle: u64) {
2972 self.entries.shift_remove(&handle);
2973 }
2974}
2975
2976fn cursor_registry() -> &'static Mutex<CursorRegistry> {
2977 static REGISTRY: OnceLock<Mutex<CursorRegistry>> = OnceLock::new();
2978 REGISTRY.get_or_init(|| Mutex::new(CursorRegistry::with_capacity(MAX_CURSOR_HANDLES)))
2979}
2980
2981fn next_cursor_handle() -> u64 {
2982 static COUNTER: AtomicU64 = AtomicU64::new(1);
2983 COUNTER.fetch_add(1, Ordering::SeqCst)
2984}
2985
2986fn sqlite_cursor_producer(
2992 conn_arc: Arc<Mutex<SqlConn>>,
2993 stmt_str: String,
2994 params: Vec<SqlParamValue>,
2995 sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
2996) {
2997 let mut conn_guard = match conn_arc.lock() {
2998 Ok(g) => g,
2999 Err(p) => p.into_inner(),
3000 };
3001 let SqlConn::Sqlite(c) = &mut *conn_guard else {
3002 let _ = sender.send(Err("sqlite_cursor_producer called on non-sqlite conn".into()));
3003 return;
3004 };
3005 let mut stmt = match c.prepare(&stmt_str) {
3006 Ok(s) => s,
3007 Err(e) => { let _ = sender.send(Err(format!("prepare: {e}"))); return; }
3008 };
3009 let column_count = stmt.column_count();
3010 let column_names: Vec<String> = (0..column_count)
3011 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
3012 .collect();
3013 let bound = sqlite_params(¶ms);
3014 let bind: Vec<&dyn rusqlite::ToSql> =
3015 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
3016 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
3017 Ok(r) => r,
3018 Err(e) => { let _ = sender.send(Err(format!("query: {e}"))); return; }
3019 };
3020 loop {
3021 match rows.next() {
3022 Ok(None) => break,
3023 Err(e) => {
3024 let _ = sender.send(Err(format!("row: {e}")));
3025 break;
3026 }
3027 Ok(Some(row)) => {
3028 let mut rec = indexmap::IndexMap::new();
3029 for (i, name) in column_names.iter().enumerate() {
3030 let val = match row.get_ref(i) {
3031 Ok(vr) => sql_value_ref_to_lex(vr),
3032 Err(_) => Value::Unit,
3033 };
3034 rec.insert(name.clone(), val);
3035 }
3036 if sender.send(Ok(Value::Record(rec))).is_err() {
3037 break;
3038 }
3039 }
3040 }
3041 }
3042}
3043
3044fn pg_cursor_producer(
3048 conn_arc: Arc<Mutex<SqlConn>>,
3049 stmt_str: String,
3050 params: Vec<SqlParamValue>,
3051 sender: std::sync::mpsc::SyncSender<Result<Value, String>>,
3052) {
3053 let mut conn_guard = match conn_arc.lock() {
3054 Ok(g) => g,
3055 Err(p) => p.into_inner(),
3056 };
3057 let SqlConn::Postgres(c) = &mut *conn_guard else {
3058 let _ = sender.send(Err("pg_cursor_producer called on non-postgres conn".into()));
3059 return;
3060 };
3061 let pg = pg_param_refs(¶ms);
3062 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
3063 pg.iter().map(|b| b.as_ref()).collect();
3064 let mut tx = match c.transaction() {
3065 Ok(t) => t,
3066 Err(e) => { let _ = sender.send(Err(format!("begin: {e}"))); return; }
3067 };
3068 let cur_name = format!("__lex_cur_{}", next_cursor_handle());
3071 if let Err(e) = tx.execute(
3072 &format!("DECLARE \"{cur_name}\" NO SCROLL CURSOR FOR {stmt_str}"),
3073 &refs,
3074 ) {
3075 let _ = sender.send(Err(format!("declare: {e}")));
3076 return;
3077 }
3078 let fetch_sql = format!("FETCH 64 FROM \"{cur_name}\"");
3079 'outer: loop {
3080 let batch = match tx.query(&fetch_sql, &[]) {
3081 Ok(r) => r,
3082 Err(e) => { let _ = sender.send(Err(format!("fetch: {e}"))); break; }
3083 };
3084 if batch.is_empty() {
3085 break;
3086 }
3087 for row in batch.iter() {
3088 let rec = pg_row_to_lex_record(row);
3089 if sender.send(Ok(Value::Record(rec))).is_err() {
3090 break 'outer;
3091 }
3092 }
3093 }
3094 let _ = tx.execute(&format!("CLOSE \"{cur_name}\""), &[]);
3095 let _ = tx.commit();
3096}
3097
3098#[derive(Debug, Clone)]
3100enum SqlParamValue {
3101 Text(String),
3102 Integer(i64),
3103 Real(f64),
3104 Bool(bool),
3105 Null,
3106}
3107
3108pub(crate) enum SqlConn {
3110 Sqlite(rusqlite::Connection),
3111 Postgres(postgres::Client),
3112}
3113
3114type SharedConn = Arc<Mutex<SqlConn>>;
3115
3116pub(crate) struct SqlRegistry {
3117 entries: indexmap::IndexMap<u64, SharedConn>,
3118 cap: usize,
3119}
3120
3121impl SqlRegistry {
3122 pub(crate) fn with_capacity(cap: usize) -> Self {
3123 Self { entries: indexmap::IndexMap::new(), cap }
3124 }
3125
3126 pub(crate) fn insert(&mut self, handle: u64, conn: SqlConn) {
3127 if self.entries.len() >= self.cap {
3128 self.entries.shift_remove_index(0);
3129 }
3130 self.entries.insert(handle, Arc::new(Mutex::new(conn)));
3131 }
3132
3133 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
3137 let idx = self.entries.get_index_of(&handle)?;
3138 self.entries.move_index(idx, self.entries.len() - 1);
3139 self.entries.get(&handle).cloned()
3140 }
3141
3142 pub(crate) fn remove(&mut self, handle: u64) {
3143 self.entries.shift_remove(&handle);
3144 }
3145
3146 #[cfg(test)]
3147 pub(crate) fn len(&self) -> usize { self.entries.len() }
3148}
3149
3150fn next_sql_handle() -> u64 {
3151 static COUNTER: AtomicU64 = AtomicU64::new(1);
3152 COUNTER.fetch_add(1, Ordering::SeqCst)
3153}
3154
3155#[cfg(test)]
3156mod sql_registry_tests {
3157 use super::{SqlConn, SqlRegistry};
3158
3159 fn fresh() -> SqlConn {
3160 SqlConn::Sqlite(rusqlite::Connection::open_in_memory().expect("open in-memory sqlite"))
3161 }
3162
3163 #[test]
3164 fn insert_and_get_round_trip() {
3165 let mut r = SqlRegistry::with_capacity(4);
3166 r.insert(1, fresh());
3167 assert!(r.touch_get(1).is_some());
3168 assert!(r.touch_get(2).is_none());
3169 }
3170
3171 #[test]
3172 fn cap_evicts_lru_on_overflow() {
3173 let mut r = SqlRegistry::with_capacity(2);
3174 r.insert(1, fresh());
3175 r.insert(2, fresh());
3176 let _ = r.touch_get(1);
3177 r.insert(3, fresh());
3178 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3179 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3180 assert!(r.touch_get(3).is_some(), "3 just inserted");
3181 assert_eq!(r.len(), 2);
3182 }
3183
3184 #[test]
3185 fn remove_drops_entry() {
3186 let mut r = SqlRegistry::with_capacity(4);
3187 r.insert(1, fresh());
3188 r.remove(1);
3189 assert!(r.touch_get(1).is_none());
3190 assert_eq!(r.len(), 0);
3191 }
3192
3193 #[test]
3194 fn many_inserts_stay_bounded_at_cap() {
3195 let cap = 8;
3196 let mut r = SqlRegistry::with_capacity(cap);
3197 for i in 0..(cap as u64 * 3) {
3198 r.insert(i, fresh());
3199 assert!(r.len() <= cap);
3200 }
3201 assert_eq!(r.len(), cap);
3202 }
3203}
3204
3205#[cfg(test)]
3206mod kv_registry_tests {
3207 use super::KvRegistry;
3208
3209 fn fresh_db(tag: &str) -> sled::Db {
3212 let dir = std::env::temp_dir().join(format!(
3213 "lex-kv-reg-{}-{}-{}",
3214 std::process::id(),
3215 tag,
3216 std::time::SystemTime::now()
3217 .duration_since(std::time::UNIX_EPOCH)
3218 .unwrap()
3219 .as_nanos()
3220 ));
3221 sled::open(&dir).expect("sled open")
3222 }
3223
3224 #[test]
3225 fn insert_and_get_round_trip() {
3226 let mut r = KvRegistry::with_capacity(4);
3227 r.insert(1, fresh_db("a"));
3228 assert!(r.touch_get(1).is_some());
3229 assert!(r.touch_get(2).is_none());
3230 }
3231
3232 #[test]
3233 fn cap_evicts_lru_on_overflow() {
3234 let mut r = KvRegistry::with_capacity(2);
3236 r.insert(1, fresh_db("c1"));
3237 r.insert(2, fresh_db("c2"));
3238 let _ = r.touch_get(1);
3239 r.insert(3, fresh_db("c3"));
3240 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
3241 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
3242 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
3243 assert_eq!(r.len(), 2);
3244 }
3245
3246 #[test]
3247 fn cap_with_no_touches_evicts_in_insertion_order() {
3248 let mut r = KvRegistry::with_capacity(2);
3250 r.insert(10, fresh_db("f1"));
3251 r.insert(20, fresh_db("f2"));
3252 r.insert(30, fresh_db("f3"));
3253 assert!(r.touch_get(10).is_none());
3254 assert!(r.touch_get(20).is_some());
3255 assert!(r.touch_get(30).is_some());
3256 }
3257
3258 #[test]
3259 fn remove_drops_entry() {
3260 let mut r = KvRegistry::with_capacity(4);
3261 r.insert(1, fresh_db("r1"));
3262 r.remove(1);
3263 assert!(r.touch_get(1).is_none());
3264 assert_eq!(r.len(), 0);
3265 }
3266
3267 #[test]
3268 fn remove_unknown_handle_is_noop() {
3269 let mut r = KvRegistry::with_capacity(4);
3270 r.insert(1, fresh_db("u1"));
3271 r.remove(999);
3272 assert!(r.touch_get(1).is_some());
3273 }
3274
3275 #[test]
3276 fn many_inserts_stay_bounded_at_cap() {
3277 let cap = 8;
3280 let mut r = KvRegistry::with_capacity(cap);
3281 for i in 0..(cap as u64 * 3) {
3282 r.insert(i, fresh_db(&format!("b{i}")));
3283 assert!(r.len() <= cap);
3284 }
3285 assert_eq!(r.len(), cap);
3286 }
3287}