1use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::cell::RefCell;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Mutex, OnceLock};
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use crate::builtins::try_pure_builtin;
17use crate::policy::Policy;
18
19pub trait IoSink: Send {
22 fn print_line(&mut self, s: &str);
23}
24
25pub struct StdoutSink;
26impl IoSink for StdoutSink {
27 fn print_line(&mut self, s: &str) {
28 println!("{s}");
29 }
30}
31
32#[derive(Default)]
33pub struct CapturedSink { pub lines: Vec<String> }
34impl IoSink for CapturedSink {
35 fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
36}
37
38pub struct DefaultHandler {
39 policy: Policy,
40 pub sink: Box<dyn IoSink>,
41 pub read_root: Option<PathBuf>,
44 pub budget_used: RefCell<u64>,
46 pub program: Option<Arc<Program>>,
50 pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
54 pub mcp_clients: crate::mcp_client::McpClientCache,
60}
61
62impl DefaultHandler {
63 pub fn new(policy: Policy) -> Self {
64 Self {
65 policy,
66 sink: Box::new(StdoutSink),
67 read_root: None,
68 budget_used: RefCell::new(0),
69 program: None,
70 chat_registry: None,
71 mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
72 }
73 }
74
75 pub fn with_program(mut self, program: Arc<Program>) -> Self {
76 self.program = Some(program); self
77 }
78
79 pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
80 self.chat_registry = Some(registry); self
81 }
82
83 pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
84 self.sink = sink; self
85 }
86
87 pub fn with_read_root(mut self, root: PathBuf) -> Self {
88 self.read_root = Some(root); self
89 }
90
91 fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
92 if self.policy.allow_effects.contains(kind) {
93 Ok(())
94 } else {
95 Err(format!("effect `{kind}` not in --allow-effects"))
96 }
97 }
98
99 fn resolve_read_path(&self, p: &str) -> PathBuf {
100 match &self.read_root {
101 Some(root) => root.join(p.trim_start_matches('/')),
102 None => PathBuf::from(p),
103 }
104 }
105
106 fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
107 match op {
108 "debug" | "info" | "warn" | "error" => {
109 let msg = expect_str(args.first())?;
110 let level = match op {
111 "debug" => LogLevel::Debug,
112 "info" => LogLevel::Info,
113 "warn" => LogLevel::Warn,
114 _ => LogLevel::Error,
115 };
116 emit_log(level, msg);
117 Ok(Value::Unit)
118 }
119 "set_level" => {
120 let s = expect_str(args.first())?;
121 match parse_log_level(s) {
122 Some(l) => {
123 log_state().lock().unwrap().level = l;
124 Ok(ok(Value::Unit))
125 }
126 None => Ok(err(Value::Str(format!(
127 "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
128 }
129 }
130 "set_format" => {
131 let s = expect_str(args.first())?;
132 let fmt = match s {
133 "text" => LogFormat::Text,
134 "json" => LogFormat::Json,
135 other => return Ok(err(Value::Str(format!(
136 "log.set_format: unknown format `{other}`; expected text|json")))),
137 };
138 log_state().lock().unwrap().format = fmt;
139 Ok(ok(Value::Unit))
140 }
141 "set_sink" => {
142 let path = expect_str(args.first())?;
143 if path == "-" {
144 log_state().lock().unwrap().sink = LogSink::Stderr;
145 return Ok(ok(Value::Unit));
146 }
147 if let Err(e) = self.ensure_fs_write_path(path) {
148 return Ok(err(Value::Str(e)));
149 }
150 match std::fs::OpenOptions::new()
151 .create(true).append(true).open(path)
152 {
153 Ok(f) => {
154 log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
155 Ok(ok(Value::Unit))
156 }
157 Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
158 }
159 }
160 other => Err(format!("unsupported log.{other}")),
161 }
162 }
163
164 fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
165 match op {
166 "spawn" => {
167 let cmd = expect_str(args.first())?.to_string();
168 let raw_args = match args.get(1) {
169 Some(Value::List(items)) => items.clone(),
170 _ => return Err("process.spawn: args must be List[Str]".into()),
171 };
172 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
173 Value::Str(s) => Ok(s.clone()),
174 other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
175 }).collect();
176 let str_args = str_args?;
177 let opts = match args.get(2) {
178 Some(Value::Record(r)) => r.clone(),
179 _ => return Err("process.spawn: missing or invalid opts record".into()),
180 };
181
182 if !self.policy.allow_proc.is_empty() {
184 let basename = std::path::Path::new(&cmd)
185 .file_name()
186 .and_then(|s| s.to_str())
187 .unwrap_or(&cmd);
188 if !self.policy.allow_proc.iter().any(|a| a == basename) {
189 return Ok(err(Value::Str(format!(
190 "process.spawn: `{cmd}` not in --allow-proc {:?}",
191 self.policy.allow_proc
192 ))));
193 }
194 }
195
196 let mut command = std::process::Command::new(&cmd);
197 command.args(&str_args);
198 command.stdin(std::process::Stdio::piped());
199 command.stdout(std::process::Stdio::piped());
200 command.stderr(std::process::Stdio::piped());
201
202 if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
203 if name == "Some" {
204 if let Some(Value::Str(s)) = vargs.first() {
205 command.current_dir(s);
206 }
207 }
208 }
209 if let Some(Value::Map(env)) = opts.get("env") {
210 for (k, v) in env {
211 if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
212 command.env(ks, vs);
213 }
214 }
215 }
216
217 let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
218 Some(Value::Variant { name, args: vargs }) if name == "Some" => {
219 match vargs.first() {
220 Some(Value::Bytes(b)) => Some(b.clone()),
221 _ => None,
222 }
223 }
224 _ => None,
225 };
226
227 let mut child = match command.spawn() {
228 Ok(c) => c,
229 Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
230 };
231
232 if let Some(payload) = stdin_payload {
233 if let Some(mut stdin) = child.stdin.take() {
234 use std::io::Write;
235 let _ = stdin.write_all(&payload);
236 }
238 }
239
240 let stdout = child.stdout.take().map(std::io::BufReader::new);
241 let stderr = child.stderr.take().map(std::io::BufReader::new);
242 let handle = next_process_handle();
243 process_registry().lock().unwrap().insert(handle, ProcessState {
244 child,
245 stdout,
246 stderr,
247 });
248 Ok(ok(Value::Int(handle as i64)))
249 }
250 "read_stdout_line" => Self::read_line_op(args, true),
251 "read_stderr_line" => Self::read_line_op(args, false),
252 "wait" => {
253 let h = expect_process_handle(args.first())?;
254 let arc = process_registry().lock().unwrap()
258 .touch_get(h)
259 .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
260 let status = {
261 let mut state = arc.lock().unwrap();
262 state.child.wait().map_err(|e| format!("process.wait: {e}"))?
263 };
264 process_registry().lock().unwrap().remove(h);
268 let mut rec = indexmap::IndexMap::new();
269 rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
270 #[cfg(unix)]
271 {
272 use std::os::unix::process::ExitStatusExt;
273 rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
274 }
275 #[cfg(not(unix))]
276 {
277 rec.insert("signaled".into(), Value::Bool(false));
278 }
279 Ok(Value::Record(rec))
280 }
281 "kill" => {
282 let h = expect_process_handle(args.first())?;
283 let _signal = expect_str(args.get(1))?;
284 let arc = process_registry().lock().unwrap()
285 .touch_get(h)
286 .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
287 let mut state = arc.lock().unwrap();
288 match state.child.kill() {
291 Ok(_) => Ok(ok(Value::Unit)),
292 Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
293 }
294 }
295 "run" => {
296 let cmd = expect_str(args.first())?.to_string();
297 let raw_args = match args.get(1) {
298 Some(Value::List(items)) => items.clone(),
299 _ => return Err("process.run: args must be List[Str]".into()),
300 };
301 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
302 Value::Str(s) => Ok(s.clone()),
303 other => Err(format!("process.run: arg must be Str, got {other:?}")),
304 }).collect();
305 let str_args = str_args?;
306 if !self.policy.allow_proc.is_empty() {
307 let basename = std::path::Path::new(&cmd)
308 .file_name()
309 .and_then(|s| s.to_str())
310 .unwrap_or(&cmd);
311 if !self.policy.allow_proc.iter().any(|a| a == basename) {
312 return Ok(err(Value::Str(format!(
313 "process.run: `{cmd}` not in --allow-proc {:?}",
314 self.policy.allow_proc
315 ))));
316 }
317 }
318 match std::process::Command::new(&cmd).args(&str_args).output() {
319 Ok(o) => {
320 let mut rec = indexmap::IndexMap::new();
321 rec.insert("stdout".into(), Value::Str(
322 String::from_utf8_lossy(&o.stdout).to_string()));
323 rec.insert("stderr".into(), Value::Str(
324 String::from_utf8_lossy(&o.stderr).to_string()));
325 rec.insert("exit_code".into(), Value::Int(
326 o.status.code().unwrap_or(-1) as i64));
327 Ok(ok(Value::Record(rec)))
328 }
329 Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
330 }
331 }
332 other => Err(format!("unsupported process.{other}")),
333 }
334 }
335
336 fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
342 let h = expect_process_handle(args.first())?;
343 let arc = process_registry().lock().unwrap()
344 .touch_get(h)
345 .ok_or_else(|| format!(
346 "process.read_{}_line: closed or unknown ProcessHandle",
347 if is_stdout { "stdout" } else { "stderr" }))?;
348 let mut state = arc.lock().unwrap();
349 let reader_opt = if is_stdout {
350 state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
351 } else {
352 state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
353 };
354 let reader = match reader_opt {
355 Some(r) => r,
356 None => return Ok(none()),
357 };
358 let mut line = String::new();
359 match reader.read_line(&mut line) {
360 Ok(0) => Ok(none()),
361 Ok(_) => {
362 if line.ends_with('\n') { line.pop(); }
363 if line.ends_with('\r') { line.pop(); }
364 Ok(some(Value::Str(line)))
365 }
366 Err(e) => Err(format!("process.read_*_line: {e}")),
367 }
368 }
369
370 fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
371 match op {
372 "exists" => {
373 let path = expect_str(args.first())?.to_string();
374 if let Err(e) = self.ensure_fs_walk_path(&path) {
375 return Ok(err(Value::Str(e)));
376 }
377 Ok(Value::Bool(std::path::Path::new(&path).exists()))
378 }
379 "is_file" => {
380 let path = expect_str(args.first())?.to_string();
381 if let Err(e) = self.ensure_fs_walk_path(&path) {
382 return Ok(err(Value::Str(e)));
383 }
384 Ok(Value::Bool(std::path::Path::new(&path).is_file()))
385 }
386 "is_dir" => {
387 let path = expect_str(args.first())?.to_string();
388 if let Err(e) = self.ensure_fs_walk_path(&path) {
389 return Ok(err(Value::Str(e)));
390 }
391 Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
392 }
393 "stat" => {
394 let path = expect_str(args.first())?.to_string();
395 if let Err(e) = self.ensure_fs_walk_path(&path) {
396 return Ok(err(Value::Str(e)));
397 }
398 match std::fs::metadata(&path) {
399 Ok(md) => {
400 let mtime = md.modified()
401 .ok()
402 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
403 .map(|d| d.as_secs() as i64)
404 .unwrap_or(0);
405 let mut rec = indexmap::IndexMap::new();
406 rec.insert("size".into(), Value::Int(md.len() as i64));
407 rec.insert("mtime".into(), Value::Int(mtime));
408 rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
409 rec.insert("is_file".into(), Value::Bool(md.is_file()));
410 Ok(ok(Value::Record(rec)))
411 }
412 Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
413 }
414 }
415 "list_dir" => {
416 let path = expect_str(args.first())?.to_string();
417 if let Err(e) = self.ensure_fs_walk_path(&path) {
418 return Ok(err(Value::Str(e)));
419 }
420 match std::fs::read_dir(&path) {
421 Ok(rd) => {
422 let mut entries: Vec<Value> = Vec::new();
423 for ent in rd {
424 match ent {
425 Ok(e) => {
426 let p = e.path();
427 entries.push(Value::Str(p.to_string_lossy().into_owned()));
428 }
429 Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
430 }
431 }
432 Ok(ok(Value::List(entries)))
433 }
434 Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
435 }
436 }
437 "walk" => {
438 let path = expect_str(args.first())?.to_string();
439 if let Err(e) = self.ensure_fs_walk_path(&path) {
440 return Ok(err(Value::Str(e)));
441 }
442 let mut paths: Vec<Value> = Vec::new();
443 for ent in walkdir::WalkDir::new(&path) {
444 match ent {
445 Ok(e) => paths.push(Value::Str(
446 e.path().to_string_lossy().into_owned())),
447 Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
448 }
449 }
450 Ok(ok(Value::List(paths)))
451 }
452 "glob" => {
453 let pattern = expect_str(args.first())?.to_string();
454 let entries = match glob::glob(&pattern) {
459 Ok(e) => e,
460 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
461 };
462 let mut paths: Vec<Value> = Vec::new();
463 for ent in entries {
464 match ent {
465 Ok(p) => {
466 let s = p.to_string_lossy().into_owned();
467 if self.policy.allow_fs_read.is_empty()
468 || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
469 {
470 paths.push(Value::Str(s));
471 }
472 }
473 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
474 }
475 }
476 Ok(ok(Value::List(paths)))
477 }
478 "mkdir_p" => {
479 let path = expect_str(args.first())?.to_string();
480 if let Err(e) = self.ensure_fs_write_path(&path) {
481 return Ok(err(Value::Str(e)));
482 }
483 match std::fs::create_dir_all(&path) {
484 Ok(_) => Ok(ok(Value::Unit)),
485 Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
486 }
487 }
488 "remove" => {
489 let path = expect_str(args.first())?.to_string();
490 if let Err(e) = self.ensure_fs_write_path(&path) {
491 return Ok(err(Value::Str(e)));
492 }
493 let p = std::path::Path::new(&path);
494 let result = if p.is_dir() {
495 std::fs::remove_dir_all(p)
496 } else {
497 std::fs::remove_file(p)
498 };
499 match result {
500 Ok(_) => Ok(ok(Value::Unit)),
501 Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
502 }
503 }
504 "copy" => {
505 let src = expect_str(args.first())?.to_string();
506 let dst = expect_str(args.get(1))?.to_string();
507 if let Err(e) = self.ensure_fs_walk_path(&src) {
508 return Ok(err(Value::Str(e)));
509 }
510 if let Err(e) = self.ensure_fs_write_path(&dst) {
511 return Ok(err(Value::Str(e)));
512 }
513 match std::fs::copy(&src, &dst) {
514 Ok(_) => Ok(ok(Value::Unit)),
515 Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
516 }
517 }
518 other => Err(format!("unsupported fs.{other}")),
519 }
520 }
521
522 fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
527 if self.policy.allow_fs_read.is_empty() {
528 return Ok(());
529 }
530 let p = std::path::Path::new(path);
531 if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
532 Ok(())
533 } else {
534 Err(format!("fs path `{path}` outside --allow-fs-read"))
535 }
536 }
537
538 fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
541 if self.policy.allow_fs_write.is_empty() {
542 return Ok(());
543 }
544 let p = std::path::Path::new(path);
545 if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
546 Ok(())
547 } else {
548 Err(format!("fs path `{path}` outside --allow-fs-write"))
549 }
550 }
551
552 fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
556 if self.policy.allow_net_host.is_empty() { return Ok(()); }
557 let host = extract_host(url).unwrap_or("");
558 if self.policy.allow_net_host.iter().any(|h| host == h) {
559 Ok(())
560 } else {
561 Err(format!(
562 "net call to host `{host}` not in --allow-net-host {:?}",
563 self.policy.allow_net_host,
564 ))
565 }
566 }
567}
568
569fn extract_host(url: &str) -> Option<&str> {
570 let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
571 let host_port = match rest.find('/') {
572 Some(i) => &rest[..i],
573 None => rest,
574 };
575 Some(match host_port.rsplit_once(':') {
576 Some((h, _)) => h,
577 None => host_port,
578 })
579}
580
581impl EffectHandler for DefaultHandler {
582 fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
583 if let Some(r) = try_pure_builtin(kind, op, &args) {
587 return r;
588 }
589 if kind == "process" {
593 self.ensure_kind_allowed("proc")?;
594 return self.dispatch_process(op, args);
595 }
596 if kind == "log" {
597 let effect_kind = match op {
600 "debug" | "info" | "warn" | "error" => "log",
601 "set_level" | "set_format" => "io",
602 "set_sink" => {
603 self.ensure_kind_allowed("io")?;
604 self.ensure_kind_allowed("fs_write")?;
605 return self.dispatch_log(op, args);
606 }
607 other => return Err(format!("unsupported log.{other}")),
608 };
609 self.ensure_kind_allowed(effect_kind)?;
610 return self.dispatch_log(op, args);
611 }
612 if kind == "fs" {
613 let effect_kind = match op {
614 "exists" | "is_file" | "is_dir" | "stat"
615 | "list_dir" | "walk" | "glob" => "fs_walk",
616 "mkdir_p" | "remove" => "fs_write",
617 "copy" => {
618 self.ensure_kind_allowed("fs_walk")?;
619 self.ensure_kind_allowed("fs_write")?;
620 return self.dispatch_fs(op, args);
621 }
622 other => return Err(format!("unsupported fs.{other}")),
623 };
624 self.ensure_kind_allowed(effect_kind)?;
625 return self.dispatch_fs(op, args);
626 }
627 if kind == "datetime" && op == "now" {
634 self.ensure_kind_allowed("time")?;
635 let now = chrono::Utc::now();
636 let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
637 return Ok(Value::Int(nanos));
638 }
639 if kind == "crypto" && op == "random" {
640 self.ensure_kind_allowed("random")?;
641 let n = expect_int(args.first())?;
642 if !(0..=1_048_576).contains(&n) {
643 return Err("crypto.random: n must be in 0..=1048576".into());
644 }
645 use rand::{rngs::OsRng, TryRngCore};
646 let mut buf = vec![0u8; n as usize];
647 OsRng.try_fill_bytes(&mut buf)
648 .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
649 return Ok(Value::Bytes(buf));
650 }
651 if kind == "agent" {
664 let effect_kind = match op {
665 "local_complete" => "llm_local",
666 "cloud_complete" => "llm_cloud",
667 "send_a2a" => "a2a",
668 "call_mcp" => "mcp",
669 other => return Err(format!("unsupported agent.{other}")),
670 };
671 self.ensure_kind_allowed(effect_kind)?;
672 return match op {
680 "call_mcp" => Ok(self.dispatch_call_mcp(args)),
681 "local_complete" => Ok(dispatch_llm_local(args)),
682 "cloud_complete" => Ok(dispatch_llm_cloud(args)),
683 _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
684 };
685 }
686 if kind == "http" && matches!(op, "send" | "get" | "post") {
687 self.ensure_kind_allowed("net")?;
688 return match op {
689 "send" => {
690 let req = expect_record(args.first())?;
691 Ok(http_send_record(self, req))
692 }
693 "get" => {
694 let url = expect_str(args.first())?.to_string();
695 self.ensure_host_allowed(&url)?;
696 Ok(http_send_simple("GET", &url, None, "", None))
697 }
698 "post" => {
699 let url = expect_str(args.first())?.to_string();
700 let body = expect_bytes(args.get(1))?.clone();
701 let content_type = expect_str(args.get(2))?.to_string();
702 self.ensure_host_allowed(&url)?;
703 Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
704 }
705 _ => unreachable!(),
706 };
707 }
708 self.ensure_kind_allowed(kind)?;
709 match (kind, op) {
710 ("io", "print") => {
711 let line = expect_str(args.first())?;
712 self.sink.print_line(line);
713 Ok(Value::Unit)
714 }
715 ("io", "read") => {
716 let path = expect_str(args.first())?.to_string();
717 let resolved = self.resolve_read_path(&path);
718 if !self.policy.allow_fs_read.is_empty()
725 && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
726 {
727 return Err(format!("read of `{path}` outside --allow-fs-read"));
728 }
729 match std::fs::read_to_string(&resolved) {
730 Ok(s) => Ok(ok(Value::Str(s))),
731 Err(e) => Ok(err(Value::Str(format!("{e}")))),
732 }
733 }
734 ("io", "write") => {
735 let path = expect_str(args.first())?.to_string();
736 let contents = expect_str(args.get(1))?.to_string();
737 if !self.policy.allow_fs_write.is_empty() {
739 let p = std::path::Path::new(&path);
740 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
741 return Err(format!("write to `{path}` outside --allow-fs-write"));
742 }
743 }
744 match std::fs::write(&path, contents) {
745 Ok(_) => Ok(ok(Value::Unit)),
746 Err(e) => Ok(err(Value::Str(format!("{e}")))),
747 }
748 }
749 ("time", "now") => {
750 let secs = SystemTime::now().duration_since(UNIX_EPOCH)
751 .map_err(|e| format!("time: {e}"))?.as_secs();
752 Ok(Value::Int(secs as i64))
753 }
754 ("rand", "int_in") => {
755 let lo = expect_int(args.first())?;
757 let hi = expect_int(args.get(1))?;
758 Ok(Value::Int((lo + hi) / 2))
759 }
760 ("budget", _) => {
761 Ok(Value::Unit)
764 }
765 ("net", "get") => {
766 let url = expect_str(args.first())?.to_string();
767 self.ensure_host_allowed(&url)?;
768 Ok(http_request("GET", &url, None))
769 }
770 ("net", "post") => {
771 let url = expect_str(args.first())?.to_string();
772 let body = expect_str(args.get(1))?.to_string();
773 self.ensure_host_allowed(&url)?;
774 Ok(http_request("POST", &url, Some(&body)))
775 }
776 ("net", "serve") => {
777 let port = match args.first() {
778 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
779 _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
780 };
781 let handler_name = expect_str(args.get(1))?.to_string();
782 let program = self.program.clone()
783 .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
784 let policy = self.policy.clone();
785 serve_http(port, handler_name, program, policy, None)
786 }
787 ("net", "serve_tls") => {
788 let port = match args.first() {
789 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
790 _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
791 };
792 let cert_path = expect_str(args.get(1))?.to_string();
793 let key_path = expect_str(args.get(2))?.to_string();
794 let handler_name = expect_str(args.get(3))?.to_string();
795 let program = self.program.clone()
796 .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
797 let policy = self.policy.clone();
798 let cert = std::fs::read(&cert_path)
799 .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
800 let key = std::fs::read(&key_path)
801 .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
802 serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
803 }
804 ("net", "serve_ws") => {
805 let port = match args.first() {
806 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
807 _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
808 };
809 let handler_name = expect_str(args.get(1))?.to_string();
810 let program = self.program.clone()
811 .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
812 let policy = self.policy.clone();
813 let registry = Arc::new(crate::ws::ChatRegistry::default());
814 crate::ws::serve_ws(port, handler_name, program, policy, registry)
815 }
816 ("chat", "broadcast") => {
817 let registry = self.chat_registry.as_ref()
818 .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
819 let room = expect_str(args.first())?;
820 let body = expect_str(args.get(1))?;
821 crate::ws::chat_broadcast(registry, room, body);
822 Ok(Value::Unit)
823 }
824 ("chat", "send") => {
825 let registry = self.chat_registry.as_ref()
826 .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
827 let conn_id = match args.first() {
828 Some(Value::Int(n)) if *n >= 0 => *n as u64,
829 _ => return Err("chat.send: conn_id must be non-negative Int".into()),
830 };
831 let body = expect_str(args.get(1))?;
832 Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
833 }
834 ("kv", "open") => {
835 let path = expect_str(args.first())?.to_string();
836 if !self.policy.allow_fs_write.is_empty() {
840 let p = std::path::Path::new(&path);
841 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
842 return Ok(err(Value::Str(format!(
843 "kv.open: `{path}` outside --allow-fs-write"))));
844 }
845 }
846 match sled::open(&path) {
847 Ok(db) => {
848 let handle = next_kv_handle();
849 kv_registry().lock().unwrap().insert(handle, db);
850 Ok(ok(Value::Int(handle as i64)))
851 }
852 Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
853 }
854 }
855 ("kv", "close") => {
856 let h = expect_kv_handle(args.first())?;
857 kv_registry().lock().unwrap().remove(h);
858 Ok(Value::Unit)
859 }
860 ("kv", "get") => {
861 let h = expect_kv_handle(args.first())?;
862 let key = expect_str(args.get(1))?;
863 let mut reg = kv_registry().lock().unwrap();
864 let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
865 match db.get(key.as_bytes()) {
866 Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
867 Ok(None) => Ok(none()),
868 Err(e) => Err(format!("kv.get: {e}")),
869 }
870 }
871 ("kv", "put") => {
872 let h = expect_kv_handle(args.first())?;
873 let key = expect_str(args.get(1))?.to_string();
874 let val = expect_bytes(args.get(2))?.clone();
875 let mut reg = kv_registry().lock().unwrap();
876 let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
877 match db.insert(key.as_bytes(), val) {
878 Ok(_) => Ok(ok(Value::Unit)),
879 Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
880 }
881 }
882 ("kv", "delete") => {
883 let h = expect_kv_handle(args.first())?;
884 let key = expect_str(args.get(1))?;
885 let mut reg = kv_registry().lock().unwrap();
886 let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
887 match db.remove(key.as_bytes()) {
888 Ok(_) => Ok(ok(Value::Unit)),
889 Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
890 }
891 }
892 ("kv", "contains") => {
893 let h = expect_kv_handle(args.first())?;
894 let key = expect_str(args.get(1))?;
895 let mut reg = kv_registry().lock().unwrap();
896 let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
897 match db.contains_key(key.as_bytes()) {
898 Ok(present) => Ok(Value::Bool(present)),
899 Err(e) => Err(format!("kv.contains: {e}")),
900 }
901 }
902 ("kv", "list_prefix") => {
903 let h = expect_kv_handle(args.first())?;
904 let prefix = expect_str(args.get(1))?;
905 let mut reg = kv_registry().lock().unwrap();
906 let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
907 let mut keys: Vec<Value> = Vec::new();
908 for kv in db.scan_prefix(prefix.as_bytes()) {
909 let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
910 let s = String::from_utf8_lossy(&k).to_string();
911 keys.push(Value::Str(s));
912 }
913 Ok(Value::List(keys))
914 }
915 ("sql", "open") => {
916 let path = expect_str(args.first())?.to_string();
917 if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
921 let p = std::path::Path::new(&path);
922 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
923 return Ok(err(Value::Str(format!(
924 "sql.open: `{path}` outside --allow-fs-write"))));
925 }
926 }
927 match rusqlite::Connection::open(&path) {
928 Ok(conn) => {
929 let handle = next_sql_handle();
930 sql_registry().lock().unwrap().insert(handle, conn);
931 Ok(ok(Value::Int(handle as i64)))
932 }
933 Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
934 }
935 }
936 ("sql", "close") => {
937 let h = expect_sql_handle(args.first())?;
938 sql_registry().lock().unwrap().remove(h);
939 Ok(Value::Unit)
940 }
941 ("sql", "exec") => {
942 let h = expect_sql_handle(args.first())?;
943 let stmt = expect_str(args.get(1))?.to_string();
944 let params = expect_str_list(args.get(2))?;
945 let arc = sql_registry().lock().unwrap()
946 .touch_get(h)
947 .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
948 let conn = arc.lock().unwrap();
949 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
950 .map(|s| s as &dyn rusqlite::ToSql)
951 .collect();
952 match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
953 Ok(n) => Ok(ok(Value::Int(n as i64))),
954 Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
955 }
956 }
957 ("sql", "query") => {
958 let h = expect_sql_handle(args.first())?;
959 let stmt_str = expect_str(args.get(1))?.to_string();
960 let params = expect_str_list(args.get(2))?;
961 let arc = sql_registry().lock().unwrap()
962 .touch_get(h)
963 .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
964 let conn = arc.lock().unwrap();
965 Ok(sql_run_query(&conn, &stmt_str, ¶ms))
966 }
967 ("proc", "spawn") => {
968 let cmd = expect_str(args.first())?.to_string();
982 let raw_args = match args.get(1) {
983 Some(Value::List(items)) => items,
984 Some(other) => return Err(format!(
985 "proc.spawn: args must be List[Str], got {other:?}")),
986 None => return Err("proc.spawn: missing args list".into()),
987 };
988 let str_args: Vec<String> = raw_args.iter().map(|v| match v {
989 Value::Str(s) => Ok(s.clone()),
990 other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
991 }).collect::<Result<Vec<_>, _>>()?;
992
993 if !self.policy.allow_proc.is_empty() {
997 let basename = std::path::Path::new(&cmd)
998 .file_name()
999 .and_then(|s| s.to_str())
1000 .unwrap_or(&cmd);
1001 if !self.policy.allow_proc.iter().any(|a| a == basename) {
1002 return Ok(err(Value::Str(format!(
1003 "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1004 self.policy.allow_proc
1005 ))));
1006 }
1007 }
1008
1009 if str_args.len() > 1024 {
1012 return Ok(err(Value::Str(
1013 "proc.spawn: arg-count exceeds 1024".into())));
1014 }
1015 if str_args.iter().any(|a| a.len() > 65_536) {
1016 return Ok(err(Value::Str(
1017 "proc.spawn: per-arg length exceeds 64 KiB".into())));
1018 }
1019
1020 let output = std::process::Command::new(&cmd)
1021 .args(&str_args)
1022 .output();
1023 match output {
1024 Ok(o) => {
1025 let mut rec = indexmap::IndexMap::new();
1026 rec.insert("stdout".into(), Value::Str(
1027 String::from_utf8_lossy(&o.stdout).to_string()));
1028 rec.insert("stderr".into(), Value::Str(
1029 String::from_utf8_lossy(&o.stderr).to_string()));
1030 rec.insert("exit_code".into(), Value::Int(
1031 o.status.code().unwrap_or(-1) as i64));
1032 Ok(ok(Value::Record(rec)))
1033 }
1034 Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1035 }
1036 }
1037 other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1038 }
1039 }
1040}
1041
1042pub struct TlsConfig {
1052 pub cert: Vec<u8>,
1053 pub key: Vec<u8>,
1054}
1055
1056fn serve_http(
1057 port: u16,
1058 handler_name: String,
1059 program: Arc<Program>,
1060 policy: Policy,
1061 tls: Option<TlsConfig>,
1062) -> Result<Value, String> {
1063 let (server, scheme) = match tls {
1064 None => (
1065 tiny_http::Server::http(("127.0.0.1", port))
1066 .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1067 "http",
1068 ),
1069 Some(cfg) => {
1070 let ssl = tiny_http::SslConfig {
1071 certificate: cfg.cert,
1072 private_key: cfg.key,
1073 };
1074 (
1075 tiny_http::Server::https(("127.0.0.1", port), ssl)
1076 .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1077 "https",
1078 )
1079 }
1080 };
1081 eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1082 for req in server.incoming_requests() {
1088 let program = Arc::clone(&program);
1089 let policy = policy.clone();
1090 let handler_name = handler_name.clone();
1091 std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1092 }
1093 Ok(Value::Unit)
1094}
1095
1096fn handle_request(
1097 mut req: tiny_http::Request,
1098 program: Arc<Program>,
1099 policy: Policy,
1100 handler_name: String,
1101) {
1102 let lex_req = build_request_value(&mut req);
1103 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1104 let mut vm = Vm::with_handler(&program, Box::new(handler));
1105 match vm.call(&handler_name, vec![lex_req]) {
1106 Ok(resp) => {
1107 let (status, body) = unpack_response(&resp);
1108 let response = tiny_http::Response::from_string(body).with_status_code(status);
1109 let _ = req.respond(response);
1110 }
1111 Err(e) => {
1112 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1113 .with_status_code(500);
1114 let _ = req.respond(response);
1115 }
1116 }
1117}
1118
1119fn build_request_value(req: &mut tiny_http::Request) -> Value {
1120 let method = format!("{:?}", req.method()).to_uppercase();
1121 let url = req.url().to_string();
1122 let (path, query) = match url.split_once('?') {
1123 Some((p, q)) => (p.to_string(), q.to_string()),
1124 None => (url, String::new()),
1125 };
1126 let mut body = String::new();
1127 let _ = req.as_reader().read_to_string(&mut body);
1128 let mut rec = indexmap::IndexMap::new();
1129 rec.insert("method".into(), Value::Str(method));
1130 rec.insert("path".into(), Value::Str(path));
1131 rec.insert("query".into(), Value::Str(query));
1132 rec.insert("body".into(), Value::Str(body));
1133 Value::Record(rec)
1134}
1135
1136fn unpack_response(v: &Value) -> (u16, String) {
1137 if let Value::Record(rec) = v {
1138 let status = rec.get("status").and_then(|s| match s {
1139 Value::Int(n) => Some(*n as u16),
1140 _ => None,
1141 }).unwrap_or(200);
1142 let body = rec.get("body").and_then(|b| match b {
1143 Value::Str(s) => Some(s.clone()),
1144 _ => None,
1145 }).unwrap_or_default();
1146 return (status, body);
1147 }
1148 (500, format!("handler returned non-record: {v:?}"))
1149}
1150
1151fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1157 use std::time::Duration;
1158 let agent: ureq::Agent = ureq::Agent::config_builder()
1163 .timeout_connect(Some(Duration::from_secs(10)))
1164 .timeout_recv_body(Some(Duration::from_secs(30)))
1165 .timeout_send_body(Some(Duration::from_secs(10)))
1166 .http_status_as_error(false)
1167 .build()
1168 .into();
1169 let resp = match (method, body) {
1170 ("GET", _) => agent.get(url).call(),
1171 ("POST", Some(b)) => agent.post(url).send(b),
1172 ("POST", None) => agent.post(url).send(""),
1173 (m, _) => return err_value(format!("unsupported method: {m}")),
1174 };
1175 match resp {
1176 Ok(mut r) => {
1177 let status = r.status().as_u16();
1178 let body = r.body_mut().read_to_string().unwrap_or_default();
1179 if (200..300).contains(&status) {
1180 Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1181 } else {
1182 err_value(format!("status {status}: {body}"))
1183 }
1184 }
1185 Err(e) => err_value(format!("transport: {e}")),
1186 }
1187}
1188
1189fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1194 use std::time::Duration;
1195 let mut b = ureq::Agent::config_builder()
1196 .timeout_connect(Some(Duration::from_secs(10)))
1197 .timeout_recv_body(Some(Duration::from_secs(30)))
1198 .timeout_send_body(Some(Duration::from_secs(10)))
1199 .http_status_as_error(false);
1200 if let Some(ms) = timeout_ms {
1201 let d = Duration::from_millis(ms);
1202 b = b.timeout_global(Some(d));
1203 }
1204 b.build().into()
1205}
1206
1207fn http_error_value(e: ureq::Error) -> Value {
1211 let (ctor, payload): (&str, Option<String>) = match &e {
1212 ureq::Error::Timeout(_) => ("TimeoutError", None),
1213 ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1214 ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1215 ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1216 _ => ("NetworkError", Some(format!("{e}"))),
1217 };
1218 let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1219 let inner = Value::Variant { name: ctor.into(), args };
1220 Value::Variant { name: "Err".into(), args: vec![inner] }
1221}
1222
1223fn http_decode_err(msg: String) -> Value {
1224 let inner = Value::Variant {
1225 name: "DecodeError".into(),
1226 args: vec![Value::Str(msg)],
1227 };
1228 Value::Variant { name: "Err".into(), args: vec![inner] }
1229}
1230
1231fn http_send_simple(
1236 method: &str,
1237 url: &str,
1238 body: Option<Vec<u8>>,
1239 content_type: &str,
1240 timeout_ms: Option<u64>,
1241) -> Value {
1242 http_send_full(method, url, body, content_type, &[], timeout_ms)
1243}
1244
1245fn http_send_full(
1246 method: &str,
1247 url: &str,
1248 body: Option<Vec<u8>>,
1249 content_type: &str,
1250 headers: &[(String, String)],
1251 timeout_ms: Option<u64>,
1252) -> Value {
1253 let agent = http_agent(timeout_ms);
1254 let resp = match method {
1255 "GET" => {
1256 let mut req = agent.get(url);
1257 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1258 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1259 req.call()
1260 }
1261 "POST" => {
1262 let body = body.unwrap_or_default();
1263 let mut req = agent.post(url);
1264 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1265 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1266 req.send(&body[..])
1267 }
1268 m => {
1269 return http_decode_err(format!("unsupported method: {m}"));
1273 }
1274 };
1275 match resp {
1276 Ok(mut r) => {
1277 let status = r.status().as_u16() as i64;
1278 let headers_map = collect_response_headers(r.headers());
1279 let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1280 Ok(b) => b,
1281 Err(e) => return http_decode_err(format!("body read: {e}")),
1282 };
1283 let mut rec = indexmap::IndexMap::new();
1284 rec.insert("status".into(), Value::Int(status));
1285 rec.insert("headers".into(), Value::Map(headers_map));
1286 rec.insert("body".into(), Value::Bytes(body_bytes));
1287 Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1288 }
1289 Err(e) => http_error_value(e),
1290 }
1291}
1292
1293fn collect_response_headers(
1294 headers: &ureq::http::HeaderMap,
1295) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1296 let mut out = std::collections::BTreeMap::new();
1297 for (name, value) in headers.iter() {
1298 let v = value.to_str().unwrap_or("").to_string();
1299 out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1300 }
1301 out
1302}
1303
1304fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1308 let method = match req.get("method") {
1309 Some(Value::Str(s)) => s.clone(),
1310 _ => return http_decode_err("HttpRequest.method must be Str".into()),
1311 };
1312 let url = match req.get("url") {
1313 Some(Value::Str(s)) => s.clone(),
1314 _ => return http_decode_err("HttpRequest.url must be Str".into()),
1315 };
1316 if let Err(e) = handler.ensure_host_allowed(&url) {
1317 return http_decode_err(e);
1318 }
1319 let body = match req.get("body") {
1320 Some(Value::Variant { name, args }) if name == "None" => None,
1321 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1322 [Value::Bytes(b)] => Some(b.clone()),
1323 _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1324 },
1325 _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1326 };
1327 let timeout_ms = match req.get("timeout_ms") {
1328 Some(Value::Variant { name, .. }) if name == "None" => None,
1329 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1330 [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1331 _ => return http_decode_err(
1332 "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1333 },
1334 _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1335 };
1336 let headers: Vec<(String, String)> = match req.get("headers") {
1337 Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1338 let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1339 let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1340 Some((kk, vv))
1341 }).collect(),
1342 _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1343 };
1344 http_send_full(&method, &url, body, "", &headers, timeout_ms)
1345}
1346
1347fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1348 match v {
1349 Some(Value::Record(r)) => Ok(r),
1350 Some(other) => Err(format!("expected Record, got {other:?}")),
1351 None => Err("missing Record argument".into()),
1352 }
1353}
1354
1355fn err_value(msg: String) -> Value {
1356 Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1357}
1358
1359fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1360 match v {
1361 Some(Value::Str(s)) => Ok(s),
1362 Some(other) => Err(format!("expected Str arg, got {other:?}")),
1363 None => Err("missing argument".into()),
1364 }
1365}
1366
1367fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1368 match v {
1369 Some(Value::Int(n)) => Ok(*n),
1370 Some(other) => Err(format!("expected Int arg, got {other:?}")),
1371 None => Err("missing argument".into()),
1372 }
1373}
1374
1375fn ok(v: Value) -> Value {
1376 Value::Variant { name: "Ok".into(), args: vec![v] }
1377}
1378fn err(v: Value) -> Value {
1379 Value::Variant { name: "Err".into(), args: vec![v] }
1380}
1381
1382impl DefaultHandler {
1383 fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1389 let server = match args.first() {
1390 Some(Value::Str(s)) => s.clone(),
1391 _ => return err(Value::Str(
1392 "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1393 };
1394 let tool = match args.get(1) {
1395 Some(Value::Str(s)) => s.clone(),
1396 _ => return err(Value::Str(
1397 "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1398 };
1399 let args_json = match args.get(2) {
1400 Some(Value::Str(s)) => s.clone(),
1401 _ => return err(Value::Str(
1402 "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1403 };
1404 let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1405 Ok(v) => v,
1406 Err(e) => return err(Value::Str(format!(
1407 "agent.call_mcp: args_json is not valid JSON: {e}"))),
1408 };
1409 match self.mcp_clients.call(&server, &tool, parsed) {
1410 Ok(result) => ok(Value::Str(
1411 serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1412 Err(e) => err(Value::Str(e)),
1413 }
1414 }
1415}
1416
1417fn dispatch_llm_local(args: Vec<Value>) -> Value {
1422 let prompt = match args.first() {
1423 Some(Value::Str(s)) => s.clone(),
1424 _ => return err(Value::Str(
1425 "agent.local_complete(prompt): prompt must be Str".into())),
1426 };
1427 match crate::llm::local_complete(&prompt) {
1428 Ok(text) => ok(Value::Str(text)),
1429 Err(e) => err(Value::Str(e)),
1430 }
1431}
1432
1433fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1440 let prompt = match args.first() {
1441 Some(Value::Str(s)) => s.clone(),
1442 _ => return err(Value::Str(
1443 "agent.cloud_complete(prompt): prompt must be Str".into())),
1444 };
1445 match crate::llm::cloud_complete(&prompt) {
1446 Ok(text) => ok(Value::Str(text)),
1447 Err(e) => err(Value::Str(e)),
1448 }
1449}
1450
1451fn some(v: Value) -> Value {
1452 Value::Variant { name: "Some".into(), args: vec![v] }
1453}
1454fn none() -> Value {
1455 Value::Variant { name: "None".into(), args: vec![] }
1456}
1457
1458fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1459 match v {
1460 Some(Value::Bytes(b)) => Ok(b),
1461 Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1462 None => Err("missing argument".into()),
1463 }
1464}
1465
1466fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1467 match v {
1468 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1469 Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1470 None => Err("missing Kv argument".into()),
1471 }
1472}
1473
1474fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1475 match v {
1476 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1477 Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1478 None => Err("missing Db argument".into()),
1479 }
1480}
1481
1482fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1483 match v {
1484 Some(Value::List(items)) => items.iter().map(|x| match x {
1485 Value::Str(s) => Ok(s.clone()),
1486 other => Err(format!("expected List[Str] element, got {other:?}")),
1487 }).collect(),
1488 Some(other) => Err(format!("expected List[Str], got {other:?}")),
1489 None => Err("missing List[Str] argument".into()),
1490 }
1491}
1492
1493fn sql_run_query(
1499 conn: &rusqlite::Connection,
1500 stmt_str: &str,
1501 params: &[String],
1502) -> Value {
1503 let mut stmt = match conn.prepare(stmt_str) {
1504 Ok(s) => s,
1505 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1506 };
1507 let column_count = stmt.column_count();
1508 let column_names: Vec<String> = (0..column_count)
1509 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1510 .collect();
1511 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1512 .map(|s| s as &dyn rusqlite::ToSql)
1513 .collect();
1514 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1515 Ok(r) => r,
1516 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1517 };
1518 let mut out: Vec<Value> = Vec::new();
1519 loop {
1520 let row = match rows.next() {
1521 Ok(Some(r)) => r,
1522 Ok(None) => break,
1523 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1524 };
1525 let mut rec = indexmap::IndexMap::new();
1526 for (i, name) in column_names.iter().enumerate() {
1527 let cell = match row.get_ref(i) {
1528 Ok(c) => sql_value_ref_to_lex(c),
1529 Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1530 };
1531 rec.insert(name.clone(), cell);
1532 }
1533 out.push(Value::Record(rec));
1534 }
1535 ok(Value::List(out))
1536}
1537
1538fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1539 use rusqlite::types::ValueRef;
1540 match v {
1541 ValueRef::Null => Value::Unit,
1542 ValueRef::Integer(n) => Value::Int(n),
1543 ValueRef::Real(f) => Value::Float(f),
1544 ValueRef::Text(s) => Value::Str(String::from_utf8_lossy(s).into_owned()),
1545 ValueRef::Blob(b) => Value::Bytes(b.to_vec()),
1546 }
1547}
1548
1549#[derive(Clone, Copy, PartialEq, PartialOrd)]
1552enum LogLevel { Debug, Info, Warn, Error }
1553
1554#[derive(Clone, Copy, PartialEq)]
1555enum LogFormat { Text, Json }
1556
1557#[derive(Clone)]
1558enum LogSink {
1559 Stderr,
1560 File(std::sync::Arc<Mutex<std::fs::File>>),
1561}
1562
1563struct LogState {
1564 level: LogLevel,
1565 format: LogFormat,
1566 sink: LogSink,
1567}
1568
1569fn log_state() -> &'static Mutex<LogState> {
1570 static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1571 STATE.get_or_init(|| Mutex::new(LogState {
1572 level: LogLevel::Info,
1573 format: LogFormat::Text,
1574 sink: LogSink::Stderr,
1575 }))
1576}
1577
1578fn parse_log_level(s: &str) -> Option<LogLevel> {
1579 match s {
1580 "debug" => Some(LogLevel::Debug),
1581 "info" => Some(LogLevel::Info),
1582 "warn" => Some(LogLevel::Warn),
1583 "error" => Some(LogLevel::Error),
1584 _ => None,
1585 }
1586}
1587
1588fn level_label(l: LogLevel) -> &'static str {
1589 match l {
1590 LogLevel::Debug => "debug",
1591 LogLevel::Info => "info",
1592 LogLevel::Warn => "warn",
1593 LogLevel::Error => "error",
1594 }
1595}
1596
1597fn emit_log(level: LogLevel, msg: &str) {
1598 let state = log_state().lock().unwrap();
1599 if level < state.level {
1600 return;
1601 }
1602 let ts = chrono::Utc::now().to_rfc3339();
1603 let line = match state.format {
1604 LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1605 LogFormat::Json => {
1606 let escaped = msg
1610 .replace('\\', "\\\\")
1611 .replace('"', "\\\"")
1612 .replace('\n', "\\n")
1613 .replace('\r', "\\r");
1614 format!(
1615 "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1616 level_label(level),
1617 )
1618 }
1619 };
1620 let sink = state.sink.clone();
1621 drop(state);
1622 match sink {
1623 LogSink::Stderr => {
1624 use std::io::Write;
1625 let _ = std::io::stderr().write_all(line.as_bytes());
1626 }
1627 LogSink::File(f) => {
1628 use std::io::Write;
1629 if let Ok(mut g) = f.lock() {
1630 let _ = g.write_all(line.as_bytes());
1631 }
1632 }
1633 }
1634}
1635
1636pub(crate) struct ProcessState {
1637 child: std::process::Child,
1638 stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1639 stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1640}
1641
1642fn process_registry() -> &'static Mutex<ProcessRegistry> {
1656 static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1657 REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1658}
1659
1660const MAX_PROCESS_HANDLES: usize = 256;
1661
1662type SharedProcessState = Arc<Mutex<ProcessState>>;
1663
1664pub(crate) struct ProcessRegistry {
1665 entries: indexmap::IndexMap<u64, SharedProcessState>,
1666 cap: usize,
1667}
1668
1669impl ProcessRegistry {
1670 pub(crate) fn with_capacity(cap: usize) -> Self {
1671 Self { entries: indexmap::IndexMap::new(), cap }
1672 }
1673
1674 pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1678 if self.entries.len() >= self.cap {
1679 self.entries.shift_remove_index(0);
1680 }
1681 self.entries.insert(handle, Arc::new(Mutex::new(state)));
1682 }
1683
1684 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1688 let idx = self.entries.get_index_of(&handle)?;
1689 self.entries.move_index(idx, self.entries.len() - 1);
1690 self.entries.get(&handle).cloned()
1691 }
1692
1693 pub(crate) fn remove(&mut self, handle: u64) {
1698 self.entries.shift_remove(&handle);
1699 }
1700
1701 #[cfg(test)]
1702 pub(crate) fn len(&self) -> usize { self.entries.len() }
1703}
1704
1705fn next_process_handle() -> u64 {
1706 static COUNTER: AtomicU64 = AtomicU64::new(1);
1707 COUNTER.fetch_add(1, Ordering::SeqCst)
1708}
1709
1710#[cfg(all(test, unix))]
1711mod process_registry_tests {
1712 use super::{ProcessRegistry, ProcessState};
1713
1714 fn fresh_state() -> ProcessState {
1718 let child = std::process::Command::new("true")
1719 .stdout(std::process::Stdio::null())
1720 .stderr(std::process::Stdio::null())
1721 .spawn()
1722 .expect("spawn `true`");
1723 ProcessState { child, stdout: None, stderr: None }
1724 }
1725
1726 #[test]
1727 fn insert_and_get_round_trip() {
1728 let mut r = ProcessRegistry::with_capacity(4);
1729 r.insert(1, fresh_state());
1730 assert!(r.touch_get(1).is_some());
1731 assert!(r.touch_get(2).is_none());
1732 }
1733
1734 #[test]
1735 fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1736 let mut r = ProcessRegistry::with_capacity(4);
1737 r.insert(1, fresh_state());
1738 r.insert(2, fresh_state());
1739 let a = r.touch_get(1).unwrap();
1740 let b = r.touch_get(2).unwrap();
1741 assert!(!std::sync::Arc::ptr_eq(&a, &b));
1743 }
1744
1745 #[test]
1746 fn cap_evicts_lru_on_overflow() {
1747 let mut r = ProcessRegistry::with_capacity(2);
1748 r.insert(1, fresh_state());
1749 r.insert(2, fresh_state());
1750 let _ = r.touch_get(1);
1751 r.insert(3, fresh_state());
1752 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1753 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1754 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
1755 assert_eq!(r.len(), 2);
1756 }
1757
1758 #[test]
1759 fn cap_with_no_touches_evicts_in_insertion_order() {
1760 let mut r = ProcessRegistry::with_capacity(2);
1761 r.insert(10, fresh_state());
1762 r.insert(20, fresh_state());
1763 r.insert(30, fresh_state());
1764 assert!(r.touch_get(10).is_none());
1765 assert!(r.touch_get(20).is_some());
1766 assert!(r.touch_get(30).is_some());
1767 }
1768
1769 #[test]
1770 fn remove_drops_entry() {
1771 let mut r = ProcessRegistry::with_capacity(4);
1772 r.insert(1, fresh_state());
1773 r.remove(1);
1774 assert!(r.touch_get(1).is_none());
1775 assert_eq!(r.len(), 0);
1776 }
1777
1778 #[test]
1779 fn many_inserts_stay_bounded_at_cap() {
1780 let cap = 8;
1781 let mut r = ProcessRegistry::with_capacity(cap);
1782 for i in 0..(cap as u64 * 3) {
1783 r.insert(i, fresh_state());
1784 assert!(r.len() <= cap);
1785 }
1786 assert_eq!(r.len(), cap);
1787 }
1788
1789 #[test]
1790 fn outstanding_arc_outlives_remove() {
1791 let mut r = ProcessRegistry::with_capacity(4);
1795 r.insert(1, fresh_state());
1796 let arc = r.touch_get(1).expect("entry exists");
1797 r.remove(1);
1798 assert!(r.touch_get(1).is_none());
1800 let _state = arc.lock().unwrap();
1801 }
1802}
1803
1804fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
1805 match v {
1806 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1807 Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
1808 None => Err("missing ProcessHandle argument".into()),
1809 }
1810}
1811
1812fn kv_registry() -> &'static Mutex<KvRegistry> {
1824 static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
1825 REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
1826}
1827
1828const MAX_KV_HANDLES: usize = 256;
1834
1835pub(crate) struct KvRegistry {
1840 entries: indexmap::IndexMap<u64, sled::Db>,
1841 cap: usize,
1842}
1843
1844impl KvRegistry {
1845 pub(crate) fn with_capacity(cap: usize) -> Self {
1846 Self { entries: indexmap::IndexMap::new(), cap }
1847 }
1848
1849 pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
1852 if self.entries.len() >= self.cap {
1853 self.entries.shift_remove_index(0);
1854 }
1855 self.entries.insert(handle, db);
1856 }
1857
1858 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
1860 let idx = self.entries.get_index_of(&handle)?;
1861 self.entries.move_index(idx, self.entries.len() - 1);
1862 self.entries.get(&handle)
1863 }
1864
1865 pub(crate) fn remove(&mut self, handle: u64) {
1867 self.entries.shift_remove(&handle);
1868 }
1869
1870 #[cfg(test)]
1871 pub(crate) fn len(&self) -> usize { self.entries.len() }
1872}
1873
1874fn next_kv_handle() -> u64 {
1875 static COUNTER: AtomicU64 = AtomicU64::new(1);
1876 COUNTER.fetch_add(1, Ordering::SeqCst)
1877}
1878
1879fn sql_registry() -> &'static Mutex<SqlRegistry> {
1886 static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
1887 REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
1888}
1889
1890const MAX_SQL_HANDLES: usize = 256;
1891
1892type SharedConn = Arc<Mutex<rusqlite::Connection>>;
1893
1894pub(crate) struct SqlRegistry {
1895 entries: indexmap::IndexMap<u64, SharedConn>,
1896 cap: usize,
1897}
1898
1899impl SqlRegistry {
1900 pub(crate) fn with_capacity(cap: usize) -> Self {
1901 Self { entries: indexmap::IndexMap::new(), cap }
1902 }
1903
1904 pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
1905 if self.entries.len() >= self.cap {
1906 self.entries.shift_remove_index(0);
1907 }
1908 self.entries.insert(handle, Arc::new(Mutex::new(conn)));
1909 }
1910
1911 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
1915 let idx = self.entries.get_index_of(&handle)?;
1916 self.entries.move_index(idx, self.entries.len() - 1);
1917 self.entries.get(&handle).cloned()
1918 }
1919
1920 pub(crate) fn remove(&mut self, handle: u64) {
1921 self.entries.shift_remove(&handle);
1922 }
1923
1924 #[cfg(test)]
1925 pub(crate) fn len(&self) -> usize { self.entries.len() }
1926}
1927
1928fn next_sql_handle() -> u64 {
1929 static COUNTER: AtomicU64 = AtomicU64::new(1);
1930 COUNTER.fetch_add(1, Ordering::SeqCst)
1931}
1932
1933#[cfg(test)]
1934mod sql_registry_tests {
1935 use super::SqlRegistry;
1936
1937 fn fresh() -> rusqlite::Connection {
1938 rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
1939 }
1940
1941 #[test]
1942 fn insert_and_get_round_trip() {
1943 let mut r = SqlRegistry::with_capacity(4);
1944 r.insert(1, fresh());
1945 assert!(r.touch_get(1).is_some());
1946 assert!(r.touch_get(2).is_none());
1947 }
1948
1949 #[test]
1950 fn cap_evicts_lru_on_overflow() {
1951 let mut r = SqlRegistry::with_capacity(2);
1952 r.insert(1, fresh());
1953 r.insert(2, fresh());
1954 let _ = r.touch_get(1);
1955 r.insert(3, fresh());
1956 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
1957 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
1958 assert!(r.touch_get(3).is_some(), "3 just inserted");
1959 assert_eq!(r.len(), 2);
1960 }
1961
1962 #[test]
1963 fn remove_drops_entry() {
1964 let mut r = SqlRegistry::with_capacity(4);
1965 r.insert(1, fresh());
1966 r.remove(1);
1967 assert!(r.touch_get(1).is_none());
1968 assert_eq!(r.len(), 0);
1969 }
1970
1971 #[test]
1972 fn many_inserts_stay_bounded_at_cap() {
1973 let cap = 8;
1974 let mut r = SqlRegistry::with_capacity(cap);
1975 for i in 0..(cap as u64 * 3) {
1976 r.insert(i, fresh());
1977 assert!(r.len() <= cap);
1978 }
1979 assert_eq!(r.len(), cap);
1980 }
1981}
1982
1983#[cfg(test)]
1984mod kv_registry_tests {
1985 use super::KvRegistry;
1986
1987 fn fresh_db(tag: &str) -> sled::Db {
1990 let dir = std::env::temp_dir().join(format!(
1991 "lex-kv-reg-{}-{}-{}",
1992 std::process::id(),
1993 tag,
1994 std::time::SystemTime::now()
1995 .duration_since(std::time::UNIX_EPOCH)
1996 .unwrap()
1997 .as_nanos()
1998 ));
1999 sled::open(&dir).expect("sled open")
2000 }
2001
2002 #[test]
2003 fn insert_and_get_round_trip() {
2004 let mut r = KvRegistry::with_capacity(4);
2005 r.insert(1, fresh_db("a"));
2006 assert!(r.touch_get(1).is_some());
2007 assert!(r.touch_get(2).is_none());
2008 }
2009
2010 #[test]
2011 fn cap_evicts_lru_on_overflow() {
2012 let mut r = KvRegistry::with_capacity(2);
2014 r.insert(1, fresh_db("c1"));
2015 r.insert(2, fresh_db("c2"));
2016 let _ = r.touch_get(1);
2017 r.insert(3, fresh_db("c3"));
2018 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2019 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2020 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2021 assert_eq!(r.len(), 2);
2022 }
2023
2024 #[test]
2025 fn cap_with_no_touches_evicts_in_insertion_order() {
2026 let mut r = KvRegistry::with_capacity(2);
2028 r.insert(10, fresh_db("f1"));
2029 r.insert(20, fresh_db("f2"));
2030 r.insert(30, fresh_db("f3"));
2031 assert!(r.touch_get(10).is_none());
2032 assert!(r.touch_get(20).is_some());
2033 assert!(r.touch_get(30).is_some());
2034 }
2035
2036 #[test]
2037 fn remove_drops_entry() {
2038 let mut r = KvRegistry::with_capacity(4);
2039 r.insert(1, fresh_db("r1"));
2040 r.remove(1);
2041 assert!(r.touch_get(1).is_none());
2042 assert_eq!(r.len(), 0);
2043 }
2044
2045 #[test]
2046 fn remove_unknown_handle_is_noop() {
2047 let mut r = KvRegistry::with_capacity(4);
2048 r.insert(1, fresh_db("u1"));
2049 r.remove(999);
2050 assert!(r.touch_get(1).is_some());
2051 }
2052
2053 #[test]
2054 fn many_inserts_stay_bounded_at_cap() {
2055 let cap = 8;
2058 let mut r = KvRegistry::with_capacity(cap);
2059 for i in 0..(cap as u64 * 3) {
2060 r.insert(i, fresh_db(&format!("b{i}")));
2061 assert!(r.len() <= cap);
2062 }
2063 assert_eq!(r.len(), cap);
2064 }
2065}