1use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::cell::RefCell;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Mutex, OnceLock};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::builtins::try_pure_builtin;
17use crate::policy::Policy;
18
19pub trait IoSink: Send {
22 fn print_line(&mut self, s: &str);
23}
24
25pub struct StdoutSink;
26impl IoSink for StdoutSink {
27 fn print_line(&mut self, s: &str) {
28 println!("{s}");
29 }
30}
31
32#[derive(Default)]
33pub struct CapturedSink { pub lines: Vec<String> }
34impl IoSink for CapturedSink {
35 fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
36}
37
38pub struct DefaultHandler {
39 policy: Policy,
40 pub sink: Box<dyn IoSink>,
41 pub read_root: Option<PathBuf>,
44 pub budget_used: RefCell<u64>,
46 pub program: Option<Arc<Program>>,
50 pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
54}
55
56impl DefaultHandler {
57 pub fn new(policy: Policy) -> Self {
58 Self {
59 policy,
60 sink: Box::new(StdoutSink),
61 read_root: None,
62 budget_used: RefCell::new(0),
63 program: None,
64 chat_registry: None,
65 }
66 }
67
68 pub fn with_program(mut self, program: Arc<Program>) -> Self {
69 self.program = Some(program); self
70 }
71
72 pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
73 self.chat_registry = Some(registry); self
74 }
75
76 pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
77 self.sink = sink; self
78 }
79
80 pub fn with_read_root(mut self, root: PathBuf) -> Self {
81 self.read_root = Some(root); self
82 }
83
84 fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
85 if self.policy.allow_effects.contains(kind) {
86 Ok(())
87 } else {
88 Err(format!("effect `{kind}` not in --allow-effects"))
89 }
90 }
91
92 fn resolve_read_path(&self, p: &str) -> PathBuf {
93 match &self.read_root {
94 Some(root) => root.join(p.trim_start_matches('/')),
95 None => PathBuf::from(p),
96 }
97 }
98
99 fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
100 match op {
101 "debug" | "info" | "warn" | "error" => {
102 let msg = expect_str(args.first())?;
103 let level = match op {
104 "debug" => LogLevel::Debug,
105 "info" => LogLevel::Info,
106 "warn" => LogLevel::Warn,
107 _ => LogLevel::Error,
108 };
109 emit_log(level, msg);
110 Ok(Value::Unit)
111 }
112 "set_level" => {
113 let s = expect_str(args.first())?;
114 match parse_log_level(s) {
115 Some(l) => {
116 log_state().lock().unwrap().level = l;
117 Ok(ok(Value::Unit))
118 }
119 None => Ok(err(Value::Str(format!(
120 "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
121 }
122 }
123 "set_format" => {
124 let s = expect_str(args.first())?;
125 let fmt = match s {
126 "text" => LogFormat::Text,
127 "json" => LogFormat::Json,
128 other => return Ok(err(Value::Str(format!(
129 "log.set_format: unknown format `{other}`; expected text|json")))),
130 };
131 log_state().lock().unwrap().format = fmt;
132 Ok(ok(Value::Unit))
133 }
134 "set_sink" => {
135 let path = expect_str(args.first())?;
136 if path == "-" {
137 log_state().lock().unwrap().sink = LogSink::Stderr;
138 return Ok(ok(Value::Unit));
139 }
140 if let Err(e) = self.ensure_fs_write_path(path) {
141 return Ok(err(Value::Str(e)));
142 }
143 match std::fs::OpenOptions::new()
144 .create(true).append(true).open(path)
145 {
146 Ok(f) => {
147 log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
148 Ok(ok(Value::Unit))
149 }
150 Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
151 }
152 }
153 other => Err(format!("unsupported log.{other}")),
154 }
155 }
156
157 fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
158 match op {
159 "spawn" => {
160 let cmd = expect_str(args.first())?.to_string();
161 let raw_args = match args.get(1) {
162 Some(Value::List(items)) => items.clone(),
163 _ => return Err("process.spawn: args must be List[Str]".into()),
164 };
165 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
166 Value::Str(s) => Ok(s.clone()),
167 other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
168 }).collect();
169 let str_args = str_args?;
170 let opts = match args.get(2) {
171 Some(Value::Record(r)) => r.clone(),
172 _ => return Err("process.spawn: missing or invalid opts record".into()),
173 };
174
175 if !self.policy.allow_proc.is_empty() {
177 let basename = std::path::Path::new(&cmd)
178 .file_name()
179 .and_then(|s| s.to_str())
180 .unwrap_or(&cmd);
181 if !self.policy.allow_proc.iter().any(|a| a == basename) {
182 return Ok(err(Value::Str(format!(
183 "process.spawn: `{cmd}` not in --allow-proc {:?}",
184 self.policy.allow_proc
185 ))));
186 }
187 }
188
189 let mut command = std::process::Command::new(&cmd);
190 command.args(&str_args);
191 command.stdin(std::process::Stdio::piped());
192 command.stdout(std::process::Stdio::piped());
193 command.stderr(std::process::Stdio::piped());
194
195 if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
196 if name == "Some" {
197 if let Some(Value::Str(s)) = vargs.first() {
198 command.current_dir(s);
199 }
200 }
201 }
202 if let Some(Value::Map(env)) = opts.get("env") {
203 for (k, v) in env {
204 if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
205 command.env(ks, vs);
206 }
207 }
208 }
209
210 let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
211 Some(Value::Variant { name, args: vargs }) if name == "Some" => {
212 match vargs.first() {
213 Some(Value::Bytes(b)) => Some(b.clone()),
214 _ => None,
215 }
216 }
217 _ => None,
218 };
219
220 let mut child = match command.spawn() {
221 Ok(c) => c,
222 Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
223 };
224
225 if let Some(payload) = stdin_payload {
226 if let Some(mut stdin) = child.stdin.take() {
227 use std::io::Write;
228 let _ = stdin.write_all(&payload);
229 }
231 }
232
233 let stdout = child.stdout.take().map(std::io::BufReader::new);
234 let stderr = child.stderr.take().map(std::io::BufReader::new);
235 let handle = next_process_handle();
236 process_registry().lock().unwrap().insert(handle, ProcessState {
237 child,
238 stdout,
239 stderr,
240 });
241 Ok(ok(Value::Int(handle as i64)))
242 }
243 "read_stdout_line" => Self::read_line_op(args, true),
244 "read_stderr_line" => Self::read_line_op(args, false),
245 "wait" => {
246 let h = expect_process_handle(args.first())?;
247 let arc = process_registry().lock().unwrap()
251 .touch_get(h)
252 .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
253 let status = {
254 let mut state = arc.lock().unwrap();
255 state.child.wait().map_err(|e| format!("process.wait: {e}"))?
256 };
257 process_registry().lock().unwrap().remove(h);
261 let mut rec = indexmap::IndexMap::new();
262 rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
263 #[cfg(unix)]
264 {
265 use std::os::unix::process::ExitStatusExt;
266 rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
267 }
268 #[cfg(not(unix))]
269 {
270 rec.insert("signaled".into(), Value::Bool(false));
271 }
272 Ok(Value::Record(rec))
273 }
274 "kill" => {
275 let h = expect_process_handle(args.first())?;
276 let _signal = expect_str(args.get(1))?;
277 let arc = process_registry().lock().unwrap()
278 .touch_get(h)
279 .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
280 let mut state = arc.lock().unwrap();
281 match state.child.kill() {
284 Ok(_) => Ok(ok(Value::Unit)),
285 Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
286 }
287 }
288 "run" => {
289 let cmd = expect_str(args.first())?.to_string();
290 let raw_args = match args.get(1) {
291 Some(Value::List(items)) => items.clone(),
292 _ => return Err("process.run: args must be List[Str]".into()),
293 };
294 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
295 Value::Str(s) => Ok(s.clone()),
296 other => Err(format!("process.run: arg must be Str, got {other:?}")),
297 }).collect();
298 let str_args = str_args?;
299 if !self.policy.allow_proc.is_empty() {
300 let basename = std::path::Path::new(&cmd)
301 .file_name()
302 .and_then(|s| s.to_str())
303 .unwrap_or(&cmd);
304 if !self.policy.allow_proc.iter().any(|a| a == basename) {
305 return Ok(err(Value::Str(format!(
306 "process.run: `{cmd}` not in --allow-proc {:?}",
307 self.policy.allow_proc
308 ))));
309 }
310 }
311 match std::process::Command::new(&cmd).args(&str_args).output() {
312 Ok(o) => {
313 let mut rec = indexmap::IndexMap::new();
314 rec.insert("stdout".into(), Value::Str(
315 String::from_utf8_lossy(&o.stdout).to_string()));
316 rec.insert("stderr".into(), Value::Str(
317 String::from_utf8_lossy(&o.stderr).to_string()));
318 rec.insert("exit_code".into(), Value::Int(
319 o.status.code().unwrap_or(-1) as i64));
320 Ok(ok(Value::Record(rec)))
321 }
322 Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
323 }
324 }
325 other => Err(format!("unsupported process.{other}")),
326 }
327 }
328
329 fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
335 let h = expect_process_handle(args.first())?;
336 let arc = process_registry().lock().unwrap()
337 .touch_get(h)
338 .ok_or_else(|| format!(
339 "process.read_{}_line: closed or unknown ProcessHandle",
340 if is_stdout { "stdout" } else { "stderr" }))?;
341 let mut state = arc.lock().unwrap();
342 let reader_opt = if is_stdout {
343 state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
344 } else {
345 state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
346 };
347 let reader = match reader_opt {
348 Some(r) => r,
349 None => return Ok(none()),
350 };
351 let mut line = String::new();
352 match reader.read_line(&mut line) {
353 Ok(0) => Ok(none()),
354 Ok(_) => {
355 if line.ends_with('\n') { line.pop(); }
356 if line.ends_with('\r') { line.pop(); }
357 Ok(some(Value::Str(line)))
358 }
359 Err(e) => Err(format!("process.read_*_line: {e}")),
360 }
361 }
362
363 fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
364 match op {
365 "exists" => {
366 let path = expect_str(args.first())?.to_string();
367 if let Err(e) = self.ensure_fs_walk_path(&path) {
368 return Ok(err(Value::Str(e)));
369 }
370 Ok(Value::Bool(std::path::Path::new(&path).exists()))
371 }
372 "is_file" => {
373 let path = expect_str(args.first())?.to_string();
374 if let Err(e) = self.ensure_fs_walk_path(&path) {
375 return Ok(err(Value::Str(e)));
376 }
377 Ok(Value::Bool(std::path::Path::new(&path).is_file()))
378 }
379 "is_dir" => {
380 let path = expect_str(args.first())?.to_string();
381 if let Err(e) = self.ensure_fs_walk_path(&path) {
382 return Ok(err(Value::Str(e)));
383 }
384 Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
385 }
386 "stat" => {
387 let path = expect_str(args.first())?.to_string();
388 if let Err(e) = self.ensure_fs_walk_path(&path) {
389 return Ok(err(Value::Str(e)));
390 }
391 match std::fs::metadata(&path) {
392 Ok(md) => {
393 let mtime = md.modified()
394 .ok()
395 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
396 .map(|d| d.as_secs() as i64)
397 .unwrap_or(0);
398 let mut rec = indexmap::IndexMap::new();
399 rec.insert("size".into(), Value::Int(md.len() as i64));
400 rec.insert("mtime".into(), Value::Int(mtime));
401 rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
402 rec.insert("is_file".into(), Value::Bool(md.is_file()));
403 Ok(ok(Value::Record(rec)))
404 }
405 Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
406 }
407 }
408 "list_dir" => {
409 let path = expect_str(args.first())?.to_string();
410 if let Err(e) = self.ensure_fs_walk_path(&path) {
411 return Ok(err(Value::Str(e)));
412 }
413 match std::fs::read_dir(&path) {
414 Ok(rd) => {
415 let mut entries: Vec<Value> = Vec::new();
416 for ent in rd {
417 match ent {
418 Ok(e) => {
419 let p = e.path();
420 entries.push(Value::Str(p.to_string_lossy().into_owned()));
421 }
422 Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
423 }
424 }
425 Ok(ok(Value::List(entries)))
426 }
427 Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
428 }
429 }
430 "walk" => {
431 let path = expect_str(args.first())?.to_string();
432 if let Err(e) = self.ensure_fs_walk_path(&path) {
433 return Ok(err(Value::Str(e)));
434 }
435 let mut paths: Vec<Value> = Vec::new();
436 for ent in walkdir::WalkDir::new(&path) {
437 match ent {
438 Ok(e) => paths.push(Value::Str(
439 e.path().to_string_lossy().into_owned())),
440 Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
441 }
442 }
443 Ok(ok(Value::List(paths)))
444 }
445 "glob" => {
446 let pattern = expect_str(args.first())?.to_string();
447 let entries = match glob::glob(&pattern) {
452 Ok(e) => e,
453 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
454 };
455 let mut paths: Vec<Value> = Vec::new();
456 for ent in entries {
457 match ent {
458 Ok(p) => {
459 let s = p.to_string_lossy().into_owned();
460 if self.policy.allow_fs_read.is_empty()
461 || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
462 {
463 paths.push(Value::Str(s));
464 }
465 }
466 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
467 }
468 }
469 Ok(ok(Value::List(paths)))
470 }
471 "mkdir_p" => {
472 let path = expect_str(args.first())?.to_string();
473 if let Err(e) = self.ensure_fs_write_path(&path) {
474 return Ok(err(Value::Str(e)));
475 }
476 match std::fs::create_dir_all(&path) {
477 Ok(_) => Ok(ok(Value::Unit)),
478 Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
479 }
480 }
481 "remove" => {
482 let path = expect_str(args.first())?.to_string();
483 if let Err(e) = self.ensure_fs_write_path(&path) {
484 return Ok(err(Value::Str(e)));
485 }
486 let p = std::path::Path::new(&path);
487 let result = if p.is_dir() {
488 std::fs::remove_dir_all(p)
489 } else {
490 std::fs::remove_file(p)
491 };
492 match result {
493 Ok(_) => Ok(ok(Value::Unit)),
494 Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
495 }
496 }
497 "copy" => {
498 let src = expect_str(args.first())?.to_string();
499 let dst = expect_str(args.get(1))?.to_string();
500 if let Err(e) = self.ensure_fs_walk_path(&src) {
501 return Ok(err(Value::Str(e)));
502 }
503 if let Err(e) = self.ensure_fs_write_path(&dst) {
504 return Ok(err(Value::Str(e)));
505 }
506 match std::fs::copy(&src, &dst) {
507 Ok(_) => Ok(ok(Value::Unit)),
508 Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
509 }
510 }
511 other => Err(format!("unsupported fs.{other}")),
512 }
513 }
514
515 fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
520 if self.policy.allow_fs_read.is_empty() {
521 return Ok(());
522 }
523 let p = std::path::Path::new(path);
524 if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
525 Ok(())
526 } else {
527 Err(format!("fs path `{path}` outside --allow-fs-read"))
528 }
529 }
530
531 fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
534 if self.policy.allow_fs_write.is_empty() {
535 return Ok(());
536 }
537 let p = std::path::Path::new(path);
538 if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
539 Ok(())
540 } else {
541 Err(format!("fs path `{path}` outside --allow-fs-write"))
542 }
543 }
544
545 fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
549 if self.policy.allow_net_host.is_empty() { return Ok(()); }
550 let host = extract_host(url).unwrap_or("");
551 if self.policy.allow_net_host.iter().any(|h| host == h) {
552 Ok(())
553 } else {
554 Err(format!(
555 "net call to host `{host}` not in --allow-net-host {:?}",
556 self.policy.allow_net_host,
557 ))
558 }
559 }
560}
561
562fn extract_host(url: &str) -> Option<&str> {
563 let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
564 let host_port = match rest.find('/') {
565 Some(i) => &rest[..i],
566 None => rest,
567 };
568 Some(match host_port.rsplit_once(':') {
569 Some((h, _)) => h,
570 None => host_port,
571 })
572}
573
574impl EffectHandler for DefaultHandler {
575 fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
576 if let Some(r) = try_pure_builtin(kind, op, &args) {
580 return r;
581 }
582 if kind == "process" {
586 self.ensure_kind_allowed("proc")?;
587 return self.dispatch_process(op, args);
588 }
589 if kind == "log" {
590 let effect_kind = match op {
593 "debug" | "info" | "warn" | "error" => "log",
594 "set_level" | "set_format" => "io",
595 "set_sink" => {
596 self.ensure_kind_allowed("io")?;
597 self.ensure_kind_allowed("fs_write")?;
598 return self.dispatch_log(op, args);
599 }
600 other => return Err(format!("unsupported log.{other}")),
601 };
602 self.ensure_kind_allowed(effect_kind)?;
603 return self.dispatch_log(op, args);
604 }
605 if kind == "fs" {
606 let effect_kind = match op {
607 "exists" | "is_file" | "is_dir" | "stat"
608 | "list_dir" | "walk" | "glob" => "fs_walk",
609 "mkdir_p" | "remove" => "fs_write",
610 "copy" => {
611 self.ensure_kind_allowed("fs_walk")?;
612 self.ensure_kind_allowed("fs_write")?;
613 return self.dispatch_fs(op, args);
614 }
615 other => return Err(format!("unsupported fs.{other}")),
616 };
617 self.ensure_kind_allowed(effect_kind)?;
618 return self.dispatch_fs(op, args);
619 }
620 if kind == "datetime" && op == "now" {
627 self.ensure_kind_allowed("time")?;
628 let now = chrono::Utc::now();
629 let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
630 return Ok(Value::Int(nanos));
631 }
632 if kind == "crypto" && op == "random" {
633 self.ensure_kind_allowed("random")?;
634 let n = expect_int(args.first())?;
635 if !(0..=1_048_576).contains(&n) {
636 return Err("crypto.random: n must be in 0..=1048576".into());
637 }
638 use rand::{rngs::OsRng, TryRngCore};
639 let mut buf = vec![0u8; n as usize];
640 OsRng.try_fill_bytes(&mut buf)
641 .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
642 return Ok(Value::Bytes(buf));
643 }
644 if kind == "agent" {
657 let effect_kind = match op {
658 "local_complete" => "llm_local",
659 "cloud_complete" => "llm_cloud",
660 "send_a2a" => "a2a",
661 "call_mcp" => "mcp",
662 other => return Err(format!("unsupported agent.{other}")),
663 };
664 self.ensure_kind_allowed(effect_kind)?;
665 if op == "call_mcp" {
670 return Ok(dispatch_call_mcp(args));
671 }
672 return Ok(ok(Value::Str(format!("<{effect_kind} stub>"))));
673 }
674 if kind == "http" && matches!(op, "send" | "get" | "post") {
675 self.ensure_kind_allowed("net")?;
676 return match op {
677 "send" => {
678 let req = expect_record(args.first())?;
679 Ok(http_send_record(self, req))
680 }
681 "get" => {
682 let url = expect_str(args.first())?.to_string();
683 self.ensure_host_allowed(&url)?;
684 Ok(http_send_simple("GET", &url, None, "", None))
685 }
686 "post" => {
687 let url = expect_str(args.first())?.to_string();
688 let body = expect_bytes(args.get(1))?.clone();
689 let content_type = expect_str(args.get(2))?.to_string();
690 self.ensure_host_allowed(&url)?;
691 Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
692 }
693 _ => unreachable!(),
694 };
695 }
696 self.ensure_kind_allowed(kind)?;
697 match (kind, op) {
698 ("io", "print") => {
699 let line = expect_str(args.first())?;
700 self.sink.print_line(line);
701 Ok(Value::Unit)
702 }
703 ("io", "read") => {
704 let path = expect_str(args.first())?.to_string();
705 let resolved = self.resolve_read_path(&path);
706 if !self.policy.allow_fs_read.is_empty()
713 && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
714 {
715 return Err(format!("read of `{path}` outside --allow-fs-read"));
716 }
717 match std::fs::read_to_string(&resolved) {
718 Ok(s) => Ok(ok(Value::Str(s))),
719 Err(e) => Ok(err(Value::Str(format!("{e}")))),
720 }
721 }
722 ("io", "write") => {
723 let path = expect_str(args.first())?.to_string();
724 let contents = expect_str(args.get(1))?.to_string();
725 if !self.policy.allow_fs_write.is_empty() {
727 let p = std::path::Path::new(&path);
728 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
729 return Err(format!("write to `{path}` outside --allow-fs-write"));
730 }
731 }
732 match std::fs::write(&path, contents) {
733 Ok(_) => Ok(ok(Value::Unit)),
734 Err(e) => Ok(err(Value::Str(format!("{e}")))),
735 }
736 }
737 ("time", "now") => {
738 let secs = SystemTime::now().duration_since(UNIX_EPOCH)
739 .map_err(|e| format!("time: {e}"))?.as_secs();
740 Ok(Value::Int(secs as i64))
741 }
742 ("rand", "int_in") => {
743 let lo = expect_int(args.first())?;
745 let hi = expect_int(args.get(1))?;
746 Ok(Value::Int((lo + hi) / 2))
747 }
748 ("budget", _) => {
749 Ok(Value::Unit)
752 }
753 ("net", "get") => {
754 let url = expect_str(args.first())?.to_string();
755 self.ensure_host_allowed(&url)?;
756 Ok(http_request("GET", &url, None))
757 }
758 ("net", "post") => {
759 let url = expect_str(args.first())?.to_string();
760 let body = expect_str(args.get(1))?.to_string();
761 self.ensure_host_allowed(&url)?;
762 Ok(http_request("POST", &url, Some(&body)))
763 }
764 ("net", "serve") => {
765 let port = match args.first() {
766 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
767 _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
768 };
769 let handler_name = expect_str(args.get(1))?.to_string();
770 let program = self.program.clone()
771 .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
772 let policy = self.policy.clone();
773 serve_http(port, handler_name, program, policy, None)
774 }
775 ("net", "serve_tls") => {
776 let port = match args.first() {
777 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
778 _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
779 };
780 let cert_path = expect_str(args.get(1))?.to_string();
781 let key_path = expect_str(args.get(2))?.to_string();
782 let handler_name = expect_str(args.get(3))?.to_string();
783 let program = self.program.clone()
784 .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
785 let policy = self.policy.clone();
786 let cert = std::fs::read(&cert_path)
787 .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
788 let key = std::fs::read(&key_path)
789 .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
790 serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
791 }
792 ("net", "serve_ws") => {
793 let port = match args.first() {
794 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
795 _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
796 };
797 let handler_name = expect_str(args.get(1))?.to_string();
798 let program = self.program.clone()
799 .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
800 let policy = self.policy.clone();
801 let registry = Arc::new(crate::ws::ChatRegistry::default());
802 crate::ws::serve_ws(port, handler_name, program, policy, registry)
803 }
804 ("chat", "broadcast") => {
805 let registry = self.chat_registry.as_ref()
806 .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
807 let room = expect_str(args.first())?;
808 let body = expect_str(args.get(1))?;
809 crate::ws::chat_broadcast(registry, room, body);
810 Ok(Value::Unit)
811 }
812 ("chat", "send") => {
813 let registry = self.chat_registry.as_ref()
814 .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
815 let conn_id = match args.first() {
816 Some(Value::Int(n)) if *n >= 0 => *n as u64,
817 _ => return Err("chat.send: conn_id must be non-negative Int".into()),
818 };
819 let body = expect_str(args.get(1))?;
820 Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
821 }
822 ("kv", "open") => {
823 let path = expect_str(args.first())?.to_string();
824 if !self.policy.allow_fs_write.is_empty() {
828 let p = std::path::Path::new(&path);
829 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
830 return Ok(err(Value::Str(format!(
831 "kv.open: `{path}` outside --allow-fs-write"))));
832 }
833 }
834 match sled::open(&path) {
835 Ok(db) => {
836 let handle = next_kv_handle();
837 kv_registry().lock().unwrap().insert(handle, db);
838 Ok(ok(Value::Int(handle as i64)))
839 }
840 Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
841 }
842 }
843 ("kv", "close") => {
844 let h = expect_kv_handle(args.first())?;
845 kv_registry().lock().unwrap().remove(h);
846 Ok(Value::Unit)
847 }
848 ("kv", "get") => {
849 let h = expect_kv_handle(args.first())?;
850 let key = expect_str(args.get(1))?;
851 let mut reg = kv_registry().lock().unwrap();
852 let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
853 match db.get(key.as_bytes()) {
854 Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
855 Ok(None) => Ok(none()),
856 Err(e) => Err(format!("kv.get: {e}")),
857 }
858 }
859 ("kv", "put") => {
860 let h = expect_kv_handle(args.first())?;
861 let key = expect_str(args.get(1))?.to_string();
862 let val = expect_bytes(args.get(2))?.clone();
863 let mut reg = kv_registry().lock().unwrap();
864 let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
865 match db.insert(key.as_bytes(), val) {
866 Ok(_) => Ok(ok(Value::Unit)),
867 Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
868 }
869 }
870 ("kv", "delete") => {
871 let h = expect_kv_handle(args.first())?;
872 let key = expect_str(args.get(1))?;
873 let mut reg = kv_registry().lock().unwrap();
874 let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
875 match db.remove(key.as_bytes()) {
876 Ok(_) => Ok(ok(Value::Unit)),
877 Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
878 }
879 }
880 ("kv", "contains") => {
881 let h = expect_kv_handle(args.first())?;
882 let key = expect_str(args.get(1))?;
883 let mut reg = kv_registry().lock().unwrap();
884 let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
885 match db.contains_key(key.as_bytes()) {
886 Ok(present) => Ok(Value::Bool(present)),
887 Err(e) => Err(format!("kv.contains: {e}")),
888 }
889 }
890 ("kv", "list_prefix") => {
891 let h = expect_kv_handle(args.first())?;
892 let prefix = expect_str(args.get(1))?;
893 let mut reg = kv_registry().lock().unwrap();
894 let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
895 let mut keys: Vec<Value> = Vec::new();
896 for kv in db.scan_prefix(prefix.as_bytes()) {
897 let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
898 let s = String::from_utf8_lossy(&k).to_string();
899 keys.push(Value::Str(s));
900 }
901 Ok(Value::List(keys))
902 }
903 ("sql", "open") => {
904 let path = expect_str(args.first())?.to_string();
905 if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
909 let p = std::path::Path::new(&path);
910 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
911 return Ok(err(Value::Str(format!(
912 "sql.open: `{path}` outside --allow-fs-write"))));
913 }
914 }
915 match rusqlite::Connection::open(&path) {
916 Ok(conn) => {
917 let handle = next_sql_handle();
918 sql_registry().lock().unwrap().insert(handle, conn);
919 Ok(ok(Value::Int(handle as i64)))
920 }
921 Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
922 }
923 }
924 ("sql", "close") => {
925 let h = expect_sql_handle(args.first())?;
926 sql_registry().lock().unwrap().remove(h);
927 Ok(Value::Unit)
928 }
929 ("sql", "exec") => {
930 let h = expect_sql_handle(args.first())?;
931 let stmt = expect_str(args.get(1))?.to_string();
932 let params = expect_str_list(args.get(2))?;
933 let arc = sql_registry().lock().unwrap()
934 .touch_get(h)
935 .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
936 let conn = arc.lock().unwrap();
937 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
938 .map(|s| s as &dyn rusqlite::ToSql)
939 .collect();
940 match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
941 Ok(n) => Ok(ok(Value::Int(n as i64))),
942 Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
943 }
944 }
945 ("sql", "query") => {
946 let h = expect_sql_handle(args.first())?;
947 let stmt_str = expect_str(args.get(1))?.to_string();
948 let params = expect_str_list(args.get(2))?;
949 let arc = sql_registry().lock().unwrap()
950 .touch_get(h)
951 .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
952 let conn = arc.lock().unwrap();
953 Ok(sql_run_query(&conn, &stmt_str, ¶ms))
954 }
955 ("proc", "spawn") => {
956 let cmd = expect_str(args.first())?.to_string();
970 let raw_args = match args.get(1) {
971 Some(Value::List(items)) => items,
972 Some(other) => return Err(format!(
973 "proc.spawn: args must be List[Str], got {other:?}")),
974 None => return Err("proc.spawn: missing args list".into()),
975 };
976 let str_args: Vec<String> = raw_args.iter().map(|v| match v {
977 Value::Str(s) => Ok(s.clone()),
978 other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
979 }).collect::<Result<Vec<_>, _>>()?;
980
981 if !self.policy.allow_proc.is_empty() {
985 let basename = std::path::Path::new(&cmd)
986 .file_name()
987 .and_then(|s| s.to_str())
988 .unwrap_or(&cmd);
989 if !self.policy.allow_proc.iter().any(|a| a == basename) {
990 return Ok(err(Value::Str(format!(
991 "proc.spawn: `{cmd}` not in --allow-proc {:?}",
992 self.policy.allow_proc
993 ))));
994 }
995 }
996
997 if str_args.len() > 1024 {
1000 return Ok(err(Value::Str(
1001 "proc.spawn: arg-count exceeds 1024".into())));
1002 }
1003 if str_args.iter().any(|a| a.len() > 65_536) {
1004 return Ok(err(Value::Str(
1005 "proc.spawn: per-arg length exceeds 64 KiB".into())));
1006 }
1007
1008 let output = std::process::Command::new(&cmd)
1009 .args(&str_args)
1010 .output();
1011 match output {
1012 Ok(o) => {
1013 let mut rec = indexmap::IndexMap::new();
1014 rec.insert("stdout".into(), Value::Str(
1015 String::from_utf8_lossy(&o.stdout).to_string()));
1016 rec.insert("stderr".into(), Value::Str(
1017 String::from_utf8_lossy(&o.stderr).to_string()));
1018 rec.insert("exit_code".into(), Value::Int(
1019 o.status.code().unwrap_or(-1) as i64));
1020 Ok(ok(Value::Record(rec)))
1021 }
1022 Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1023 }
1024 }
1025 other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1026 }
1027 }
1028}
1029
1030pub struct TlsConfig {
1040 pub cert: Vec<u8>,
1041 pub key: Vec<u8>,
1042}
1043
1044fn serve_http(
1045 port: u16,
1046 handler_name: String,
1047 program: Arc<Program>,
1048 policy: Policy,
1049 tls: Option<TlsConfig>,
1050) -> Result<Value, String> {
1051 let (server, scheme) = match tls {
1052 None => (
1053 tiny_http::Server::http(("127.0.0.1", port))
1054 .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1055 "http",
1056 ),
1057 Some(cfg) => {
1058 let ssl = tiny_http::SslConfig {
1059 certificate: cfg.cert,
1060 private_key: cfg.key,
1061 };
1062 (
1063 tiny_http::Server::https(("127.0.0.1", port), ssl)
1064 .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1065 "https",
1066 )
1067 }
1068 };
1069 eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1070 for req in server.incoming_requests() {
1076 let program = Arc::clone(&program);
1077 let policy = policy.clone();
1078 let handler_name = handler_name.clone();
1079 std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1080 }
1081 Ok(Value::Unit)
1082}
1083
1084fn handle_request(
1085 mut req: tiny_http::Request,
1086 program: Arc<Program>,
1087 policy: Policy,
1088 handler_name: String,
1089) {
1090 let lex_req = build_request_value(&mut req);
1091 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1092 let mut vm = Vm::with_handler(&program, Box::new(handler));
1093 match vm.call(&handler_name, vec![lex_req]) {
1094 Ok(resp) => {
1095 let (status, body) = unpack_response(&resp);
1096 let response = tiny_http::Response::from_string(body).with_status_code(status);
1097 let _ = req.respond(response);
1098 }
1099 Err(e) => {
1100 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1101 .with_status_code(500);
1102 let _ = req.respond(response);
1103 }
1104 }
1105}
1106
1107fn build_request_value(req: &mut tiny_http::Request) -> Value {
1108 let method = format!("{:?}", req.method()).to_uppercase();
1109 let url = req.url().to_string();
1110 let (path, query) = match url.split_once('?') {
1111 Some((p, q)) => (p.to_string(), q.to_string()),
1112 None => (url, String::new()),
1113 };
1114 let mut body = String::new();
1115 let _ = req.as_reader().read_to_string(&mut body);
1116 let mut rec = indexmap::IndexMap::new();
1117 rec.insert("method".into(), Value::Str(method));
1118 rec.insert("path".into(), Value::Str(path));
1119 rec.insert("query".into(), Value::Str(query));
1120 rec.insert("body".into(), Value::Str(body));
1121 Value::Record(rec)
1122}
1123
1124fn unpack_response(v: &Value) -> (u16, String) {
1125 if let Value::Record(rec) = v {
1126 let status = rec.get("status").and_then(|s| match s {
1127 Value::Int(n) => Some(*n as u16),
1128 _ => None,
1129 }).unwrap_or(200);
1130 let body = rec.get("body").and_then(|b| match b {
1131 Value::Str(s) => Some(s.clone()),
1132 _ => None,
1133 }).unwrap_or_default();
1134 return (status, body);
1135 }
1136 (500, format!("handler returned non-record: {v:?}"))
1137}
1138
1139fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1145 use std::time::Duration;
1146 let agent: ureq::Agent = ureq::Agent::config_builder()
1151 .timeout_connect(Some(Duration::from_secs(10)))
1152 .timeout_recv_body(Some(Duration::from_secs(30)))
1153 .timeout_send_body(Some(Duration::from_secs(10)))
1154 .http_status_as_error(false)
1155 .build()
1156 .into();
1157 let resp = match (method, body) {
1158 ("GET", _) => agent.get(url).call(),
1159 ("POST", Some(b)) => agent.post(url).send(b),
1160 ("POST", None) => agent.post(url).send(""),
1161 (m, _) => return err_value(format!("unsupported method: {m}")),
1162 };
1163 match resp {
1164 Ok(mut r) => {
1165 let status = r.status().as_u16();
1166 let body = r.body_mut().read_to_string().unwrap_or_default();
1167 if (200..300).contains(&status) {
1168 Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1169 } else {
1170 err_value(format!("status {status}: {body}"))
1171 }
1172 }
1173 Err(e) => err_value(format!("transport: {e}")),
1174 }
1175}
1176
1177fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1182 use std::time::Duration;
1183 let mut b = ureq::Agent::config_builder()
1184 .timeout_connect(Some(Duration::from_secs(10)))
1185 .timeout_recv_body(Some(Duration::from_secs(30)))
1186 .timeout_send_body(Some(Duration::from_secs(10)))
1187 .http_status_as_error(false);
1188 if let Some(ms) = timeout_ms {
1189 let d = Duration::from_millis(ms);
1190 b = b.timeout_global(Some(d));
1191 }
1192 b.build().into()
1193}
1194
1195fn http_error_value(e: ureq::Error) -> Value {
1199 let (ctor, payload): (&str, Option<String>) = match &e {
1200 ureq::Error::Timeout(_) => ("TimeoutError", None),
1201 ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1202 ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1203 ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1204 _ => ("NetworkError", Some(format!("{e}"))),
1205 };
1206 let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1207 let inner = Value::Variant { name: ctor.into(), args };
1208 Value::Variant { name: "Err".into(), args: vec![inner] }
1209}
1210
1211fn http_decode_err(msg: String) -> Value {
1212 let inner = Value::Variant {
1213 name: "DecodeError".into(),
1214 args: vec![Value::Str(msg)],
1215 };
1216 Value::Variant { name: "Err".into(), args: vec![inner] }
1217}
1218
1219fn http_send_simple(
1224 method: &str,
1225 url: &str,
1226 body: Option<Vec<u8>>,
1227 content_type: &str,
1228 timeout_ms: Option<u64>,
1229) -> Value {
1230 http_send_full(method, url, body, content_type, &[], timeout_ms)
1231}
1232
1233fn http_send_full(
1234 method: &str,
1235 url: &str,
1236 body: Option<Vec<u8>>,
1237 content_type: &str,
1238 headers: &[(String, String)],
1239 timeout_ms: Option<u64>,
1240) -> Value {
1241 let agent = http_agent(timeout_ms);
1242 let resp = match method {
1243 "GET" => {
1244 let mut req = agent.get(url);
1245 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1246 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1247 req.call()
1248 }
1249 "POST" => {
1250 let body = body.unwrap_or_default();
1251 let mut req = agent.post(url);
1252 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1253 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1254 req.send(&body[..])
1255 }
1256 m => {
1257 return http_decode_err(format!("unsupported method: {m}"));
1261 }
1262 };
1263 match resp {
1264 Ok(mut r) => {
1265 let status = r.status().as_u16() as i64;
1266 let headers_map = collect_response_headers(r.headers());
1267 let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1268 Ok(b) => b,
1269 Err(e) => return http_decode_err(format!("body read: {e}")),
1270 };
1271 let mut rec = indexmap::IndexMap::new();
1272 rec.insert("status".into(), Value::Int(status));
1273 rec.insert("headers".into(), Value::Map(headers_map));
1274 rec.insert("body".into(), Value::Bytes(body_bytes));
1275 Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1276 }
1277 Err(e) => http_error_value(e),
1278 }
1279}
1280
1281fn collect_response_headers(
1282 headers: &ureq::http::HeaderMap,
1283) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1284 let mut out = std::collections::BTreeMap::new();
1285 for (name, value) in headers.iter() {
1286 let v = value.to_str().unwrap_or("").to_string();
1287 out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1288 }
1289 out
1290}
1291
1292fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1296 let method = match req.get("method") {
1297 Some(Value::Str(s)) => s.clone(),
1298 _ => return http_decode_err("HttpRequest.method must be Str".into()),
1299 };
1300 let url = match req.get("url") {
1301 Some(Value::Str(s)) => s.clone(),
1302 _ => return http_decode_err("HttpRequest.url must be Str".into()),
1303 };
1304 if let Err(e) = handler.ensure_host_allowed(&url) {
1305 return http_decode_err(e);
1306 }
1307 let body = match req.get("body") {
1308 Some(Value::Variant { name, args }) if name == "None" => None,
1309 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1310 [Value::Bytes(b)] => Some(b.clone()),
1311 _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1312 },
1313 _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1314 };
1315 let timeout_ms = match req.get("timeout_ms") {
1316 Some(Value::Variant { name, .. }) if name == "None" => None,
1317 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1318 [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1319 _ => return http_decode_err(
1320 "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1321 },
1322 _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1323 };
1324 let headers: Vec<(String, String)> = match req.get("headers") {
1325 Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1326 let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1327 let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1328 Some((kk, vv))
1329 }).collect(),
1330 _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1331 };
1332 http_send_full(&method, &url, body, "", &headers, timeout_ms)
1333}
1334
1335fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1336 match v {
1337 Some(Value::Record(r)) => Ok(r),
1338 Some(other) => Err(format!("expected Record, got {other:?}")),
1339 None => Err("missing Record argument".into()),
1340 }
1341}
1342
1343fn err_value(msg: String) -> Value {
1344 Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1345}
1346
1347fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1348 match v {
1349 Some(Value::Str(s)) => Ok(s),
1350 Some(other) => Err(format!("expected Str arg, got {other:?}")),
1351 None => Err("missing argument".into()),
1352 }
1353}
1354
1355fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1356 match v {
1357 Some(Value::Int(n)) => Ok(*n),
1358 Some(other) => Err(format!("expected Int arg, got {other:?}")),
1359 None => Err("missing argument".into()),
1360 }
1361}
1362
1363fn ok(v: Value) -> Value {
1364 Value::Variant { name: "Ok".into(), args: vec![v] }
1365}
1366fn err(v: Value) -> Value {
1367 Value::Variant { name: "Err".into(), args: vec![v] }
1368}
1369
1370fn dispatch_call_mcp(args: Vec<Value>) -> Value {
1377 let server = match args.first() {
1378 Some(Value::Str(s)) => s.clone(),
1379 _ => return err(Value::Str(
1380 "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1381 };
1382 let tool = match args.get(1) {
1383 Some(Value::Str(s)) => s.clone(),
1384 _ => return err(Value::Str(
1385 "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1386 };
1387 let args_json = match args.get(2) {
1388 Some(Value::Str(s)) => s.clone(),
1389 _ => return err(Value::Str(
1390 "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1391 };
1392 let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1393 Ok(v) => v,
1394 Err(e) => return err(Value::Str(format!(
1395 "agent.call_mcp: args_json is not valid JSON: {e}"))),
1396 };
1397 let mut client = match crate::mcp_client::McpClient::spawn(&server) {
1398 Ok(c) => c,
1399 Err(e) => return err(Value::Str(e)),
1400 };
1401 match client.call_tool(&tool, parsed) {
1402 Ok(result) => ok(Value::Str(
1403 serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1404 Err(e) => err(Value::Str(e)),
1405 }
1406}
1407
1408fn some(v: Value) -> Value {
1409 Value::Variant { name: "Some".into(), args: vec![v] }
1410}
1411fn none() -> Value {
1412 Value::Variant { name: "None".into(), args: vec![] }
1413}
1414
1415fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1416 match v {
1417 Some(Value::Bytes(b)) => Ok(b),
1418 Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1419 None => Err("missing argument".into()),
1420 }
1421}
1422
1423fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1424 match v {
1425 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1426 Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1427 None => Err("missing Kv argument".into()),
1428 }
1429}
1430
1431fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1432 match v {
1433 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1434 Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1435 None => Err("missing Db argument".into()),
1436 }
1437}
1438
1439fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1440 match v {
1441 Some(Value::List(items)) => items.iter().map(|x| match x {
1442 Value::Str(s) => Ok(s.clone()),
1443 other => Err(format!("expected List[Str] element, got {other:?}")),
1444 }).collect(),
1445 Some(other) => Err(format!("expected List[Str], got {other:?}")),
1446 None => Err("missing List[Str] argument".into()),
1447 }
1448}
1449
1450fn sql_run_query(
1456 conn: &rusqlite::Connection,
1457 stmt_str: &str,
1458 params: &[String],
1459) -> Value {
1460 let mut stmt = match conn.prepare(stmt_str) {
1461 Ok(s) => s,
1462 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1463 };
1464 let column_count = stmt.column_count();
1465 let column_names: Vec<String> = (0..column_count)
1466 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1467 .collect();
1468 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1469 .map(|s| s as &dyn rusqlite::ToSql)
1470 .collect();
1471 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1472 Ok(r) => r,
1473 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1474 };
1475 let mut out: Vec<Value> = Vec::new();
1476 loop {
1477 let row = match rows.next() {
1478 Ok(Some(r)) => r,
1479 Ok(None) => break,
1480 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1481 };
1482 let mut rec = indexmap::IndexMap::new();
1483 for (i, name) in column_names.iter().enumerate() {
1484 let cell = match row.get_ref(i) {
1485 Ok(c) => sql_value_ref_to_lex(c),
1486 Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1487 };
1488 rec.insert(name.clone(), cell);
1489 }
1490 out.push(Value::Record(rec));
1491 }
1492 ok(Value::List(out))
1493}
1494
1495fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1496 use rusqlite::types::ValueRef;
1497 match v {
1498 ValueRef::Null => Value::Unit,
1499 ValueRef::Integer(n) => Value::Int(n),
1500 ValueRef::Real(f) => Value::Float(f),
1501 ValueRef::Text(s) => Value::Str(String::from_utf8_lossy(s).into_owned()),
1502 ValueRef::Blob(b) => Value::Bytes(b.to_vec()),
1503 }
1504}
1505
1506#[derive(Clone, Copy, PartialEq, PartialOrd)]
1509enum LogLevel { Debug, Info, Warn, Error }
1510
1511#[derive(Clone, Copy, PartialEq)]
1512enum LogFormat { Text, Json }
1513
1514#[derive(Clone)]
1515enum LogSink {
1516 Stderr,
1517 File(std::sync::Arc<Mutex<std::fs::File>>),
1518}
1519
1520struct LogState {
1521 level: LogLevel,
1522 format: LogFormat,
1523 sink: LogSink,
1524}
1525
1526fn log_state() -> &'static Mutex<LogState> {
1527 static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1528 STATE.get_or_init(|| Mutex::new(LogState {
1529 level: LogLevel::Info,
1530 format: LogFormat::Text,
1531 sink: LogSink::Stderr,
1532 }))
1533}
1534
1535fn parse_log_level(s: &str) -> Option<LogLevel> {
1536 match s {
1537 "debug" => Some(LogLevel::Debug),
1538 "info" => Some(LogLevel::Info),
1539 "warn" => Some(LogLevel::Warn),
1540 "error" => Some(LogLevel::Error),
1541 _ => None,
1542 }
1543}
1544
1545fn level_label(l: LogLevel) -> &'static str {
1546 match l {
1547 LogLevel::Debug => "debug",
1548 LogLevel::Info => "info",
1549 LogLevel::Warn => "warn",
1550 LogLevel::Error => "error",
1551 }
1552}
1553
1554fn emit_log(level: LogLevel, msg: &str) {
1555 let state = log_state().lock().unwrap();
1556 if level < state.level {
1557 return;
1558 }
1559 let ts = chrono::Utc::now().to_rfc3339();
1560 let line = match state.format {
1561 LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1562 LogFormat::Json => {
1563 let escaped = msg
1567 .replace('\\', "\\\\")
1568 .replace('"', "\\\"")
1569 .replace('\n', "\\n")
1570 .replace('\r', "\\r");
1571 format!(
1572 "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1573 level_label(level),
1574 )
1575 }
1576 };
1577 let sink = state.sink.clone();
1578 drop(state);
1579 match sink {
1580 LogSink::Stderr => {
1581 use std::io::Write;
1582 let _ = std::io::stderr().write_all(line.as_bytes());
1583 }
1584 LogSink::File(f) => {
1585 use std::io::Write;
1586 if let Ok(mut g) = f.lock() {
1587 let _ = g.write_all(line.as_bytes());
1588 }
1589 }
1590 }
1591}
1592
1593pub(crate) struct ProcessState {
1594 child: std::process::Child,
1595 stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1596 stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1597}
1598
1599fn process_registry() -> &'static Mutex<ProcessRegistry> {
1613 static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1614 REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1615}
1616
1617const MAX_PROCESS_HANDLES: usize = 256;
1618
1619type SharedProcessState = Arc<Mutex<ProcessState>>;
1620
1621pub(crate) struct ProcessRegistry {
1622 entries: indexmap::IndexMap<u64, SharedProcessState>,
1623 cap: usize,
1624}
1625
1626impl ProcessRegistry {
1627 pub(crate) fn with_capacity(cap: usize) -> Self {
1628 Self { entries: indexmap::IndexMap::new(), cap }
1629 }
1630
1631 pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1635 if self.entries.len() >= self.cap {
1636 self.entries.shift_remove_index(0);
1637 }
1638 self.entries.insert(handle, Arc::new(Mutex::new(state)));
1639 }
1640
1641 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1645 let idx = self.entries.get_index_of(&handle)?;
1646 self.entries.move_index(idx, self.entries.len() - 1);
1647 self.entries.get(&handle).cloned()
1648 }
1649
1650 pub(crate) fn remove(&mut self, handle: u64) {
1655 self.entries.shift_remove(&handle);
1656 }
1657
1658 #[cfg(test)]
1659 pub(crate) fn len(&self) -> usize { self.entries.len() }
1660}
1661
1662fn next_process_handle() -> u64 {
1663 static COUNTER: AtomicU64 = AtomicU64::new(1);
1664 COUNTER.fetch_add(1, Ordering::SeqCst)
1665}
1666
1667#[cfg(all(test, unix))]
1668mod process_registry_tests {
1669 use super::{ProcessRegistry, ProcessState};
1670
1671 fn fresh_state() -> ProcessState {
1675 let child = std::process::Command::new("true")
1676 .stdout(std::process::Stdio::null())
1677 .stderr(std::process::Stdio::null())
1678 .spawn()
1679 .expect("spawn `true`");
1680 ProcessState { child, stdout: None, stderr: None }
1681 }
1682
1683 #[test]
1684 fn insert_and_get_round_trip() {
1685 let mut r = ProcessRegistry::with_capacity(4);
1686 r.insert(1, fresh_state());
1687 assert!(r.touch_get(1).is_some());
1688 assert!(r.touch_get(2).is_none());
1689 }
1690
1691 #[test]
1692 fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1693 let mut r = ProcessRegistry::with_capacity(4);
1694 r.insert(1, fresh_state());
1695 r.insert(2, fresh_state());
1696 let a = r.touch_get(1).unwrap();
1697 let b = r.touch_get(2).unwrap();
1698 assert!(!std::sync::Arc::ptr_eq(&a, &b));
1700 }
1701
1702 #[test]
1703 fn cap_evicts_lru_on_overflow() {
1704 let mut r = ProcessRegistry::with_capacity(2);
1705 r.insert(1, fresh_state());
1706 r.insert(2, fresh_state());
1707 let _ = r.touch_get(1);
1708 r.insert(3, fresh_state());
1709 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1710 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1711 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1712 assert_eq!(r.len(), 2);
1713 }
1714
1715 #[test]
1716 fn cap_with_no_touches_evicts_in_insertion_order() {
1717 let mut r = ProcessRegistry::with_capacity(2);
1718 r.insert(10, fresh_state());
1719 r.insert(20, fresh_state());
1720 r.insert(30, fresh_state());
1721 assert!(r.touch_get(10).is_none());
1722 assert!(r.touch_get(20).is_some());
1723 assert!(r.touch_get(30).is_some());
1724 }
1725
1726 #[test]
1727 fn remove_drops_entry() {
1728 let mut r = ProcessRegistry::with_capacity(4);
1729 r.insert(1, fresh_state());
1730 r.remove(1);
1731 assert!(r.touch_get(1).is_none());
1732 assert_eq!(r.len(), 0);
1733 }
1734
1735 #[test]
1736 fn many_inserts_stay_bounded_at_cap() {
1737 let cap = 8;
1738 let mut r = ProcessRegistry::with_capacity(cap);
1739 for i in 0..(cap as u64 * 3) {
1740 r.insert(i, fresh_state());
1741 assert!(r.len() <= cap);
1742 }
1743 assert_eq!(r.len(), cap);
1744 }
1745
1746 #[test]
1747 fn outstanding_arc_outlives_remove() {
1748 let mut r = ProcessRegistry::with_capacity(4);
1752 r.insert(1, fresh_state());
1753 let arc = r.touch_get(1).expect("entry exists");
1754 r.remove(1);
1755 assert!(r.touch_get(1).is_none());
1757 let _state = arc.lock().unwrap();
1758 }
1759}
1760
1761fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
1762 match v {
1763 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1764 Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
1765 None => Err("missing ProcessHandle argument".into()),
1766 }
1767}
1768
1769fn kv_registry() -> &'static Mutex<KvRegistry> {
1781 static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
1782 REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
1783}
1784
1785const MAX_KV_HANDLES: usize = 256;
1791
1792pub(crate) struct KvRegistry {
1797 entries: indexmap::IndexMap<u64, sled::Db>,
1798 cap: usize,
1799}
1800
1801impl KvRegistry {
1802 pub(crate) fn with_capacity(cap: usize) -> Self {
1803 Self { entries: indexmap::IndexMap::new(), cap }
1804 }
1805
1806 pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
1809 if self.entries.len() >= self.cap {
1810 self.entries.shift_remove_index(0);
1811 }
1812 self.entries.insert(handle, db);
1813 }
1814
1815 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
1817 let idx = self.entries.get_index_of(&handle)?;
1818 self.entries.move_index(idx, self.entries.len() - 1);
1819 self.entries.get(&handle)
1820 }
1821
1822 pub(crate) fn remove(&mut self, handle: u64) {
1824 self.entries.shift_remove(&handle);
1825 }
1826
1827 #[cfg(test)]
1828 pub(crate) fn len(&self) -> usize { self.entries.len() }
1829}
1830
1831fn next_kv_handle() -> u64 {
1832 static COUNTER: AtomicU64 = AtomicU64::new(1);
1833 COUNTER.fetch_add(1, Ordering::SeqCst)
1834}
1835
1836fn sql_registry() -> &'static Mutex<SqlRegistry> {
1843 static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
1844 REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
1845}
1846
1847const MAX_SQL_HANDLES: usize = 256;
1848
1849type SharedConn = Arc<Mutex<rusqlite::Connection>>;
1850
1851pub(crate) struct SqlRegistry {
1852 entries: indexmap::IndexMap<u64, SharedConn>,
1853 cap: usize,
1854}
1855
1856impl SqlRegistry {
1857 pub(crate) fn with_capacity(cap: usize) -> Self {
1858 Self { entries: indexmap::IndexMap::new(), cap }
1859 }
1860
1861 pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
1862 if self.entries.len() >= self.cap {
1863 self.entries.shift_remove_index(0);
1864 }
1865 self.entries.insert(handle, Arc::new(Mutex::new(conn)));
1866 }
1867
1868 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
1872 let idx = self.entries.get_index_of(&handle)?;
1873 self.entries.move_index(idx, self.entries.len() - 1);
1874 self.entries.get(&handle).cloned()
1875 }
1876
1877 pub(crate) fn remove(&mut self, handle: u64) {
1878 self.entries.shift_remove(&handle);
1879 }
1880
1881 #[cfg(test)]
1882 pub(crate) fn len(&self) -> usize { self.entries.len() }
1883}
1884
1885fn next_sql_handle() -> u64 {
1886 static COUNTER: AtomicU64 = AtomicU64::new(1);
1887 COUNTER.fetch_add(1, Ordering::SeqCst)
1888}
1889
1890#[cfg(test)]
1891mod sql_registry_tests {
1892 use super::SqlRegistry;
1893
1894 fn fresh() -> rusqlite::Connection {
1895 rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
1896 }
1897
1898 #[test]
1899 fn insert_and_get_round_trip() {
1900 let mut r = SqlRegistry::with_capacity(4);
1901 r.insert(1, fresh());
1902 assert!(r.touch_get(1).is_some());
1903 assert!(r.touch_get(2).is_none());
1904 }
1905
1906 #[test]
1907 fn cap_evicts_lru_on_overflow() {
1908 let mut r = SqlRegistry::with_capacity(2);
1909 r.insert(1, fresh());
1910 r.insert(2, fresh());
1911 let _ = r.touch_get(1);
1912 r.insert(3, fresh());
1913 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1914 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1915 assert!(r.touch_get(3).is_some(), "3 just inserted");
1916 assert_eq!(r.len(), 2);
1917 }
1918
1919 #[test]
1920 fn remove_drops_entry() {
1921 let mut r = SqlRegistry::with_capacity(4);
1922 r.insert(1, fresh());
1923 r.remove(1);
1924 assert!(r.touch_get(1).is_none());
1925 assert_eq!(r.len(), 0);
1926 }
1927
1928 #[test]
1929 fn many_inserts_stay_bounded_at_cap() {
1930 let cap = 8;
1931 let mut r = SqlRegistry::with_capacity(cap);
1932 for i in 0..(cap as u64 * 3) {
1933 r.insert(i, fresh());
1934 assert!(r.len() <= cap);
1935 }
1936 assert_eq!(r.len(), cap);
1937 }
1938}
1939
1940#[cfg(test)]
1941mod kv_registry_tests {
1942 use super::KvRegistry;
1943
1944 fn fresh_db(tag: &str) -> sled::Db {
1947 let dir = std::env::temp_dir().join(format!(
1948 "lex-kv-reg-{}-{}-{}",
1949 std::process::id(),
1950 tag,
1951 std::time::SystemTime::now()
1952 .duration_since(std::time::UNIX_EPOCH)
1953 .unwrap()
1954 .as_nanos()
1955 ));
1956 sled::open(&dir).expect("sled open")
1957 }
1958
1959 #[test]
1960 fn insert_and_get_round_trip() {
1961 let mut r = KvRegistry::with_capacity(4);
1962 r.insert(1, fresh_db("a"));
1963 assert!(r.touch_get(1).is_some());
1964 assert!(r.touch_get(2).is_none());
1965 }
1966
1967 #[test]
1968 fn cap_evicts_lru_on_overflow() {
1969 let mut r = KvRegistry::with_capacity(2);
1971 r.insert(1, fresh_db("c1"));
1972 r.insert(2, fresh_db("c2"));
1973 let _ = r.touch_get(1);
1974 r.insert(3, fresh_db("c3"));
1975 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1976 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1977 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1978 assert_eq!(r.len(), 2);
1979 }
1980
1981 #[test]
1982 fn cap_with_no_touches_evicts_in_insertion_order() {
1983 let mut r = KvRegistry::with_capacity(2);
1985 r.insert(10, fresh_db("f1"));
1986 r.insert(20, fresh_db("f2"));
1987 r.insert(30, fresh_db("f3"));
1988 assert!(r.touch_get(10).is_none());
1989 assert!(r.touch_get(20).is_some());
1990 assert!(r.touch_get(30).is_some());
1991 }
1992
1993 #[test]
1994 fn remove_drops_entry() {
1995 let mut r = KvRegistry::with_capacity(4);
1996 r.insert(1, fresh_db("r1"));
1997 r.remove(1);
1998 assert!(r.touch_get(1).is_none());
1999 assert_eq!(r.len(), 0);
2000 }
2001
2002 #[test]
2003 fn remove_unknown_handle_is_noop() {
2004 let mut r = KvRegistry::with_capacity(4);
2005 r.insert(1, fresh_db("u1"));
2006 r.remove(999);
2007 assert!(r.touch_get(1).is_some());
2008 }
2009
2010 #[test]
2011 fn many_inserts_stay_bounded_at_cap() {
2012 let cap = 8;
2015 let mut r = KvRegistry::with_capacity(cap);
2016 for i in 0..(cap as u64 * 3) {
2017 r.insert(i, fresh_db(&format!("b{i}")));
2018 assert!(r.len() <= cap);
2019 }
2020 assert_eq!(r.len(), cap);
2021 }
2022}