1use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18pub trait IoSink: Send {
21 fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26 fn print_line(&mut self, s: &str) {
27 println!("{s}");
28 }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34 fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37pub type StreamRegistry =
40 std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43 policy: Policy,
44 pub sink: Box<dyn IoSink>,
45 pub read_root: Option<PathBuf>,
48 pub budget_remaining: Arc<AtomicU64>,
55 pub budget_ceiling: Option<u64>,
59 pub program: Option<Arc<Program>>,
63 pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67 pub mcp_clients: crate::mcp_client::McpClientCache,
73 pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80 pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85 pub fn new(policy: Policy) -> Self {
86 let ceiling = policy.budget;
90 let initial = ceiling.unwrap_or(u64::MAX);
91 Self {
92 policy,
93 sink: Box::new(StdoutSink),
94 read_root: None,
95 budget_remaining: Arc::new(AtomicU64::new(initial)),
96 budget_ceiling: ceiling,
97 program: None,
98 chat_registry: None,
99 mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100 streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101 next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102 }
103 }
104
105 pub fn with_program(mut self, program: Arc<Program>) -> Self {
106 self.program = Some(program); self
107 }
108
109 pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110 self.chat_registry = Some(registry); self
111 }
112
113 pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114 self.sink = sink; self
115 }
116
117 pub fn with_read_root(mut self, root: PathBuf) -> Self {
118 self.read_root = Some(root); self
119 }
120
121 fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122 if self.policy.allow_effects.contains(kind) {
123 Ok(())
124 } else {
125 Err(format!("effect `{kind}` not in --allow-effects"))
126 }
127 }
128
129 fn resolve_read_path(&self, p: &str) -> PathBuf {
130 match &self.read_root {
131 Some(root) => root.join(p.trim_start_matches('/')),
132 None => PathBuf::from(p),
133 }
134 }
135
136 fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137 match op {
138 "debug" | "info" | "warn" | "error" => {
139 let msg = expect_str(args.first())?;
140 let level = match op {
141 "debug" => LogLevel::Debug,
142 "info" => LogLevel::Info,
143 "warn" => LogLevel::Warn,
144 _ => LogLevel::Error,
145 };
146 emit_log(level, msg);
147 Ok(Value::Unit)
148 }
149 "set_level" => {
150 let s = expect_str(args.first())?;
151 match parse_log_level(s) {
152 Some(l) => {
153 log_state().lock().unwrap().level = l;
154 Ok(ok(Value::Unit))
155 }
156 None => Ok(err(Value::Str(format!(
157 "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158 }
159 }
160 "set_format" => {
161 let s = expect_str(args.first())?;
162 let fmt = match s {
163 "text" => LogFormat::Text,
164 "json" => LogFormat::Json,
165 other => return Ok(err(Value::Str(format!(
166 "log.set_format: unknown format `{other}`; expected text|json")))),
167 };
168 log_state().lock().unwrap().format = fmt;
169 Ok(ok(Value::Unit))
170 }
171 "set_sink" => {
172 let path = expect_str(args.first())?;
173 if path == "-" {
174 log_state().lock().unwrap().sink = LogSink::Stderr;
175 return Ok(ok(Value::Unit));
176 }
177 if let Err(e) = self.ensure_fs_write_path(path) {
178 return Ok(err(Value::Str(e)));
179 }
180 match std::fs::OpenOptions::new()
181 .create(true).append(true).open(path)
182 {
183 Ok(f) => {
184 log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185 Ok(ok(Value::Unit))
186 }
187 Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188 }
189 }
190 other => Err(format!("unsupported log.{other}")),
191 }
192 }
193
194 fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195 match op {
196 "spawn" => {
197 let cmd = expect_str(args.first())?.to_string();
198 let raw_args = match args.get(1) {
199 Some(Value::List(items)) => items.clone(),
200 _ => return Err("process.spawn: args must be List[Str]".into()),
201 };
202 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203 Value::Str(s) => Ok(s.clone()),
204 other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205 }).collect();
206 let str_args = str_args?;
207 let opts = match args.get(2) {
208 Some(Value::Record(r)) => r.clone(),
209 _ => return Err("process.spawn: missing or invalid opts record".into()),
210 };
211
212 if !self.policy.allow_proc.is_empty() {
214 let basename = std::path::Path::new(&cmd)
215 .file_name()
216 .and_then(|s| s.to_str())
217 .unwrap_or(&cmd);
218 if !self.policy.allow_proc.iter().any(|a| a == basename) {
219 return Ok(err(Value::Str(format!(
220 "process.spawn: `{cmd}` not in --allow-proc {:?}",
221 self.policy.allow_proc
222 ))));
223 }
224 }
225
226 let mut command = std::process::Command::new(&cmd);
227 command.args(&str_args);
228 command.stdin(std::process::Stdio::piped());
229 command.stdout(std::process::Stdio::piped());
230 command.stderr(std::process::Stdio::piped());
231
232 if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233 if name == "Some" {
234 if let Some(Value::Str(s)) = vargs.first() {
235 command.current_dir(s);
236 }
237 }
238 }
239 if let Some(Value::Map(env)) = opts.get("env") {
240 for (k, v) in env {
241 if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242 command.env(ks, vs);
243 }
244 }
245 }
246
247 let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248 Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249 match vargs.first() {
250 Some(Value::Bytes(b)) => Some(b.clone()),
251 _ => None,
252 }
253 }
254 _ => None,
255 };
256
257 let mut child = match command.spawn() {
258 Ok(c) => c,
259 Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260 };
261
262 if let Some(payload) = stdin_payload {
263 if let Some(mut stdin) = child.stdin.take() {
264 use std::io::Write;
265 let _ = stdin.write_all(&payload);
266 }
268 }
269
270 let stdout = child.stdout.take().map(std::io::BufReader::new);
271 let stderr = child.stderr.take().map(std::io::BufReader::new);
272 let handle = next_process_handle();
273 process_registry().lock().unwrap().insert(handle, ProcessState {
274 child,
275 stdout,
276 stderr,
277 });
278 Ok(ok(Value::Int(handle as i64)))
279 }
280 "read_stdout_line" => Self::read_line_op(args, true),
281 "read_stderr_line" => Self::read_line_op(args, false),
282 "wait" => {
283 let h = expect_process_handle(args.first())?;
284 let arc = process_registry().lock().unwrap()
288 .touch_get(h)
289 .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290 let status = {
291 let mut state = arc.lock().unwrap();
292 state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293 };
294 process_registry().lock().unwrap().remove(h);
298 let mut rec = indexmap::IndexMap::new();
299 rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300 #[cfg(unix)]
301 {
302 use std::os::unix::process::ExitStatusExt;
303 rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304 }
305 #[cfg(not(unix))]
306 {
307 rec.insert("signaled".into(), Value::Bool(false));
308 }
309 Ok(Value::Record(rec))
310 }
311 "kill" => {
312 let h = expect_process_handle(args.first())?;
313 let _signal = expect_str(args.get(1))?;
314 let arc = process_registry().lock().unwrap()
315 .touch_get(h)
316 .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317 let mut state = arc.lock().unwrap();
318 match state.child.kill() {
321 Ok(_) => Ok(ok(Value::Unit)),
322 Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323 }
324 }
325 "run" => {
326 let cmd = expect_str(args.first())?.to_string();
327 let raw_args = match args.get(1) {
328 Some(Value::List(items)) => items.clone(),
329 _ => return Err("process.run: args must be List[Str]".into()),
330 };
331 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332 Value::Str(s) => Ok(s.clone()),
333 other => Err(format!("process.run: arg must be Str, got {other:?}")),
334 }).collect();
335 let str_args = str_args?;
336 if !self.policy.allow_proc.is_empty() {
337 let basename = std::path::Path::new(&cmd)
338 .file_name()
339 .and_then(|s| s.to_str())
340 .unwrap_or(&cmd);
341 if !self.policy.allow_proc.iter().any(|a| a == basename) {
342 return Ok(err(Value::Str(format!(
343 "process.run: `{cmd}` not in --allow-proc {:?}",
344 self.policy.allow_proc
345 ))));
346 }
347 }
348 match std::process::Command::new(&cmd).args(&str_args).output() {
349 Ok(o) => {
350 let mut rec = indexmap::IndexMap::new();
351 rec.insert("stdout".into(), Value::Str(
352 String::from_utf8_lossy(&o.stdout).to_string()));
353 rec.insert("stderr".into(), Value::Str(
354 String::from_utf8_lossy(&o.stderr).to_string()));
355 rec.insert("exit_code".into(), Value::Int(
356 o.status.code().unwrap_or(-1) as i64));
357 Ok(ok(Value::Record(rec)))
358 }
359 Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360 }
361 }
362 other => Err(format!("unsupported process.{other}")),
363 }
364 }
365
366 fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372 let h = expect_process_handle(args.first())?;
373 let arc = process_registry().lock().unwrap()
374 .touch_get(h)
375 .ok_or_else(|| format!(
376 "process.read_{}_line: closed or unknown ProcessHandle",
377 if is_stdout { "stdout" } else { "stderr" }))?;
378 let mut state = arc.lock().unwrap();
379 let reader_opt = if is_stdout {
380 state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381 } else {
382 state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383 };
384 let reader = match reader_opt {
385 Some(r) => r,
386 None => return Ok(none()),
387 };
388 let mut line = String::new();
389 match reader.read_line(&mut line) {
390 Ok(0) => Ok(none()),
391 Ok(_) => {
392 if line.ends_with('\n') { line.pop(); }
393 if line.ends_with('\r') { line.pop(); }
394 Ok(some(Value::Str(line)))
395 }
396 Err(e) => Err(format!("process.read_*_line: {e}")),
397 }
398 }
399
400 fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401 match op {
402 "exists" => {
403 let path = expect_str(args.first())?.to_string();
404 if let Err(e) = self.ensure_fs_walk_path(&path) {
405 return Ok(err(Value::Str(e)));
406 }
407 Ok(Value::Bool(std::path::Path::new(&path).exists()))
408 }
409 "is_file" => {
410 let path = expect_str(args.first())?.to_string();
411 if let Err(e) = self.ensure_fs_walk_path(&path) {
412 return Ok(err(Value::Str(e)));
413 }
414 Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415 }
416 "is_dir" => {
417 let path = expect_str(args.first())?.to_string();
418 if let Err(e) = self.ensure_fs_walk_path(&path) {
419 return Ok(err(Value::Str(e)));
420 }
421 Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422 }
423 "stat" => {
424 let path = expect_str(args.first())?.to_string();
425 if let Err(e) = self.ensure_fs_walk_path(&path) {
426 return Ok(err(Value::Str(e)));
427 }
428 match std::fs::metadata(&path) {
429 Ok(md) => {
430 let mtime = md.modified()
431 .ok()
432 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433 .map(|d| d.as_secs() as i64)
434 .unwrap_or(0);
435 let mut rec = indexmap::IndexMap::new();
436 rec.insert("size".into(), Value::Int(md.len() as i64));
437 rec.insert("mtime".into(), Value::Int(mtime));
438 rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439 rec.insert("is_file".into(), Value::Bool(md.is_file()));
440 Ok(ok(Value::Record(rec)))
441 }
442 Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443 }
444 }
445 "list_dir" => {
446 let path = expect_str(args.first())?.to_string();
447 if let Err(e) = self.ensure_fs_walk_path(&path) {
448 return Ok(err(Value::Str(e)));
449 }
450 match std::fs::read_dir(&path) {
451 Ok(rd) => {
452 let mut entries: Vec<Value> = Vec::new();
453 for ent in rd {
454 match ent {
455 Ok(e) => {
456 let p = e.path();
457 entries.push(Value::Str(p.to_string_lossy().into_owned()));
458 }
459 Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460 }
461 }
462 Ok(ok(Value::List(entries)))
463 }
464 Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465 }
466 }
467 "walk" => {
468 let path = expect_str(args.first())?.to_string();
469 if let Err(e) = self.ensure_fs_walk_path(&path) {
470 return Ok(err(Value::Str(e)));
471 }
472 let mut paths: Vec<Value> = Vec::new();
473 for ent in walkdir::WalkDir::new(&path) {
474 match ent {
475 Ok(e) => paths.push(Value::Str(
476 e.path().to_string_lossy().into_owned())),
477 Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478 }
479 }
480 Ok(ok(Value::List(paths)))
481 }
482 "glob" => {
483 let pattern = expect_str(args.first())?.to_string();
484 let entries = match glob::glob(&pattern) {
489 Ok(e) => e,
490 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491 };
492 let mut paths: Vec<Value> = Vec::new();
493 for ent in entries {
494 match ent {
495 Ok(p) => {
496 let s = p.to_string_lossy().into_owned();
497 if self.policy.allow_fs_read.is_empty()
498 || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499 {
500 paths.push(Value::Str(s));
501 }
502 }
503 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504 }
505 }
506 Ok(ok(Value::List(paths)))
507 }
508 "mkdir_p" => {
509 let path = expect_str(args.first())?.to_string();
510 if let Err(e) = self.ensure_fs_write_path(&path) {
511 return Ok(err(Value::Str(e)));
512 }
513 match std::fs::create_dir_all(&path) {
514 Ok(_) => Ok(ok(Value::Unit)),
515 Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516 }
517 }
518 "remove" => {
519 let path = expect_str(args.first())?.to_string();
520 if let Err(e) = self.ensure_fs_write_path(&path) {
521 return Ok(err(Value::Str(e)));
522 }
523 let p = std::path::Path::new(&path);
524 let result = if p.is_dir() {
525 std::fs::remove_dir_all(p)
526 } else {
527 std::fs::remove_file(p)
528 };
529 match result {
530 Ok(_) => Ok(ok(Value::Unit)),
531 Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532 }
533 }
534 "copy" => {
535 let src = expect_str(args.first())?.to_string();
536 let dst = expect_str(args.get(1))?.to_string();
537 if let Err(e) = self.ensure_fs_walk_path(&src) {
538 return Ok(err(Value::Str(e)));
539 }
540 if let Err(e) = self.ensure_fs_write_path(&dst) {
541 return Ok(err(Value::Str(e)));
542 }
543 match std::fs::copy(&src, &dst) {
544 Ok(_) => Ok(ok(Value::Unit)),
545 Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546 }
547 }
548 other => Err(format!("unsupported fs.{other}")),
549 }
550 }
551
552 fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557 if self.policy.allow_fs_read.is_empty() {
558 return Ok(());
559 }
560 let p = std::path::Path::new(path);
561 if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562 Ok(())
563 } else {
564 Err(format!("fs path `{path}` outside --allow-fs-read"))
565 }
566 }
567
568 fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571 if self.policy.allow_fs_write.is_empty() {
572 return Ok(());
573 }
574 let p = std::path::Path::new(path);
575 if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576 Ok(())
577 } else {
578 Err(format!("fs path `{path}` outside --allow-fs-write"))
579 }
580 }
581
582 fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586 if self.policy.allow_net_host.is_empty() { return Ok(()); }
587 let host = extract_host(url).unwrap_or("");
588 if self.policy.allow_net_host.iter().any(|h| host == h) {
589 Ok(())
590 } else {
591 Err(format!(
592 "net call to host `{host}` not in --allow-net-host {:?}",
593 self.policy.allow_net_host,
594 ))
595 }
596 }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600 let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601 let host_port = match rest.find('/') {
602 Some(i) => &rest[..i],
603 None => rest,
604 };
605 Some(match host_port.rsplit_once(':') {
606 Some((h, _)) => h,
607 None => host_port,
608 })
609}
610
611impl EffectHandler for DefaultHandler {
612 fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617 let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620 loop {
626 let cur = self.budget_remaining.load(Ordering::SeqCst);
627 if cost > cur {
628 let used = ceiling.saturating_sub(cur);
629 return Err(format!(
630 "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631 }
632 let next = cur - cost;
633 if self.budget_remaining.compare_exchange(cur, next,
636 Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637 return Ok(());
638 }
639 }
640 }
641
642 fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643 if let Some(r) = try_pure_builtin(kind, op, &args) {
647 return r;
648 }
649 if kind == "process" {
653 self.ensure_kind_allowed("proc")?;
654 return self.dispatch_process(op, args);
655 }
656 if kind == "log" {
657 let effect_kind = match op {
660 "debug" | "info" | "warn" | "error" => "log",
661 "set_level" | "set_format" => "io",
662 "set_sink" => {
663 self.ensure_kind_allowed("io")?;
664 self.ensure_kind_allowed("fs_write")?;
665 return self.dispatch_log(op, args);
666 }
667 other => return Err(format!("unsupported log.{other}")),
668 };
669 self.ensure_kind_allowed(effect_kind)?;
670 return self.dispatch_log(op, args);
671 }
672 if kind == "fs" {
673 let effect_kind = match op {
674 "exists" | "is_file" | "is_dir" | "stat"
675 | "list_dir" | "walk" | "glob" => "fs_walk",
676 "mkdir_p" | "remove" => "fs_write",
677 "copy" => {
678 self.ensure_kind_allowed("fs_walk")?;
679 self.ensure_kind_allowed("fs_write")?;
680 return self.dispatch_fs(op, args);
681 }
682 other => return Err(format!("unsupported fs.{other}")),
683 };
684 self.ensure_kind_allowed(effect_kind)?;
685 return self.dispatch_fs(op, args);
686 }
687 if kind == "datetime" && op == "now" {
694 self.ensure_kind_allowed("time")?;
695 if let Ok(s) = std::env::var("LEX_TEST_NOW") {
697 if let Ok(secs) = s.trim().parse::<i64>() {
698 return Ok(Value::Int(secs.saturating_mul(1_000_000_000)));
699 }
700 }
701 let now = chrono::Utc::now();
702 let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
703 return Ok(Value::Int(nanos));
704 }
705 if kind == "crypto" && op == "random" {
706 self.ensure_kind_allowed("random")?;
707 let n = expect_int(args.first())?;
708 if !(0..=1_048_576).contains(&n) {
709 return Err("crypto.random: n must be in 0..=1048576".into());
710 }
711 use rand::{rngs::SysRng, TryRng};
712 let mut buf = vec![0u8; n as usize];
713 SysRng.try_fill_bytes(&mut buf)
714 .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
715 return Ok(Value::Bytes(buf));
716 }
717 if kind == "agent" {
730 let effect_kind = match op {
731 "local_complete" => "llm_local",
732 "cloud_complete" => "llm_cloud",
733 "cloud_stream" => "llm_cloud",
734 "send_a2a" => "a2a",
735 "call_mcp" => "mcp",
736 other => return Err(format!("unsupported agent.{other}")),
737 };
738 self.ensure_kind_allowed(effect_kind)?;
739 return match op {
747 "call_mcp" => Ok(self.dispatch_call_mcp(args)),
748 "local_complete" => Ok(dispatch_llm_local(args)),
749 "cloud_complete" => Ok(dispatch_llm_cloud(args)),
750 "cloud_stream" => Ok(self.dispatch_cloud_stream(args)),
751 _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
752 };
753 }
754 if kind == "stream" {
755 self.ensure_kind_allowed("stream")?;
762 return match op {
763 "next" => Ok(self.dispatch_stream_next(args)),
764 "collect" => Ok(self.dispatch_stream_collect(args)),
765 other => Err(format!("unsupported stream.{other}")),
766 };
767 }
768 if kind == "http" && matches!(op, "send" | "get" | "post") {
769 self.ensure_kind_allowed("net")?;
770 return match op {
771 "send" => {
772 let req = expect_record(args.first())?;
773 Ok(http_send_record(self, req))
774 }
775 "get" => {
776 let url = expect_str(args.first())?.to_string();
777 self.ensure_host_allowed(&url)?;
778 Ok(http_send_simple("GET", &url, None, "", None))
779 }
780 "post" => {
781 let url = expect_str(args.first())?.to_string();
782 let body = expect_bytes(args.get(1))?.clone();
783 let content_type = expect_str(args.get(2))?.to_string();
784 self.ensure_host_allowed(&url)?;
785 Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
786 }
787 _ => unreachable!(),
788 };
789 }
790 self.ensure_kind_allowed(kind)?;
791 match (kind, op) {
792 ("io", "print") => {
793 let line = expect_str(args.first())?;
794 self.sink.print_line(line);
795 Ok(Value::Unit)
796 }
797 ("io", "read") => {
798 let path = expect_str(args.first())?.to_string();
799 let resolved = self.resolve_read_path(&path);
800 if !self.policy.allow_fs_read.is_empty()
807 && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
808 {
809 return Err(format!("read of `{path}` outside --allow-fs-read"));
810 }
811 match std::fs::read_to_string(&resolved) {
812 Ok(s) => Ok(ok(Value::Str(s))),
813 Err(e) => Ok(err(Value::Str(format!("{e}")))),
814 }
815 }
816 ("io", "write") => {
817 let path = expect_str(args.first())?.to_string();
818 let contents = expect_str(args.get(1))?.to_string();
819 if !self.policy.allow_fs_write.is_empty() {
821 let p = std::path::Path::new(&path);
822 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
823 return Err(format!("write to `{path}` outside --allow-fs-write"));
824 }
825 }
826 match std::fs::write(&path, contents) {
827 Ok(_) => Ok(ok(Value::Unit)),
828 Err(e) => Ok(err(Value::Str(format!("{e}")))),
829 }
830 }
831 ("time", "now") => {
832 let secs = SystemTime::now().duration_since(UNIX_EPOCH)
833 .map_err(|e| format!("time: {e}"))?.as_secs();
834 Ok(Value::Int(secs as i64))
835 }
836 ("time", "sleep_ms") => {
837 let n = expect_int(args.first())?;
845 if n > 0 {
846 let ms = (n as u64).min(60_000);
847 std::thread::sleep(std::time::Duration::from_millis(ms));
848 }
849 Ok(Value::Unit)
850 }
851 ("rand", "int_in") => {
852 let lo = expect_int(args.first())?;
854 let hi = expect_int(args.get(1))?;
855 Ok(Value::Int((lo + hi) / 2))
856 }
857 ("env", "get") => {
862 let name = expect_str(args.first())?;
863 Ok(match std::env::var(name) {
864 Ok(v) => Value::Variant {
865 name: "Some".into(),
866 args: vec![Value::Str(v)],
867 },
868 Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
869 })
870 }
871 ("budget", _) => {
872 Ok(Value::Unit)
875 }
876 ("net", "get") => {
877 let url = expect_str(args.first())?.to_string();
878 self.ensure_host_allowed(&url)?;
879 Ok(http_request("GET", &url, None))
880 }
881 ("net", "post") => {
882 let url = expect_str(args.first())?.to_string();
883 let body = expect_str(args.get(1))?.to_string();
884 self.ensure_host_allowed(&url)?;
885 Ok(http_request("POST", &url, Some(&body)))
886 }
887 ("net", "serve") => {
888 let port = match args.first() {
889 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
890 _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
891 };
892 let handler_name = expect_str(args.get(1))?.to_string();
893 let program = self.program.clone()
894 .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
895 let policy = self.policy.clone();
896 serve_http(port, handler_name, program, policy, None)
897 }
898 ("net", "serve_fn") => {
899 let port = match args.first() {
900 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
901 _ => return Err("net.serve_fn(port, handler): port must be Int 0..=65535".into()),
902 };
903 let closure = match args.into_iter().nth(1) {
904 Some(c @ Value::Closure { .. }) => c,
905 _ => return Err("net.serve_fn(port, handler): handler must be a closure".into()),
906 };
907 let program = self.program.clone()
908 .ok_or_else(|| "net.serve_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
909 let policy = self.policy.clone();
910 serve_http_fn(port, closure, program, policy)
911 }
912 ("net", "serve_tls") => {
913 let port = match args.first() {
914 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
915 _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
916 };
917 let cert_path = expect_str(args.get(1))?.to_string();
918 let key_path = expect_str(args.get(2))?.to_string();
919 let handler_name = expect_str(args.get(3))?.to_string();
920 let program = self.program.clone()
921 .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
922 let policy = self.policy.clone();
923 let cert = std::fs::read(&cert_path)
924 .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
925 let key = std::fs::read(&key_path)
926 .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
927 serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
928 }
929 ("net", "serve_ws") => {
930 let port = match args.first() {
931 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
932 _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
933 };
934 let handler_name = expect_str(args.get(1))?.to_string();
935 let program = self.program.clone()
936 .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
937 let policy = self.policy.clone();
938 let registry = Arc::new(crate::ws::ChatRegistry::default());
939 crate::ws::serve_ws(port, handler_name, program, policy, registry)
940 }
941 ("net", "serve_ws_fn") => {
942 let port = match args.first() {
943 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
944 _ => return Err("net.serve_ws_fn(port, subprotocol, handler): port must be Int 0..=65535".into()),
945 };
946 let subprotocol = expect_str(args.get(1))?.to_string();
947 let closure = match args.into_iter().nth(2) {
948 Some(c @ Value::Closure { .. }) => c,
949 _ => return Err("net.serve_ws_fn(port, subprotocol, handler): handler must be a closure".into()),
950 };
951 let program = self.program.clone()
952 .ok_or_else(|| "net.serve_ws_fn requires a Program reference; use DefaultHandler::with_program".to_string())?;
953 let policy = self.policy.clone();
954 let registry = Arc::new(crate::ws::ChatRegistry::default());
955 crate::ws::serve_ws_fn(port, subprotocol, closure, program, policy, registry)
956 }
957 ("chat", "broadcast") => {
958 let registry = self.chat_registry.as_ref()
959 .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
960 let room = expect_str(args.first())?;
961 let body = expect_str(args.get(1))?;
962 crate::ws::chat_broadcast(registry, room, body);
963 Ok(Value::Unit)
964 }
965 ("chat", "send") => {
966 let registry = self.chat_registry.as_ref()
967 .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
968 let conn_id = match args.first() {
969 Some(Value::Int(n)) if *n >= 0 => *n as u64,
970 _ => return Err("chat.send: conn_id must be non-negative Int".into()),
971 };
972 let body = expect_str(args.get(1))?;
973 Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
974 }
975 ("kv", "open") => {
976 let path = expect_str(args.first())?.to_string();
977 if !self.policy.allow_fs_write.is_empty() {
981 let p = std::path::Path::new(&path);
982 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
983 return Ok(err(Value::Str(format!(
984 "kv.open: `{path}` outside --allow-fs-write"))));
985 }
986 }
987 match sled::open(&path) {
988 Ok(db) => {
989 let handle = next_kv_handle();
990 kv_registry().lock().unwrap().insert(handle, db);
991 Ok(ok(Value::Int(handle as i64)))
992 }
993 Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
994 }
995 }
996 ("kv", "close") => {
997 let h = expect_kv_handle(args.first())?;
998 kv_registry().lock().unwrap().remove(h);
999 Ok(Value::Unit)
1000 }
1001 ("kv", "get") => {
1002 let h = expect_kv_handle(args.first())?;
1003 let key = expect_str(args.get(1))?;
1004 let mut reg = kv_registry().lock().unwrap();
1005 let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
1006 match db.get(key.as_bytes()) {
1007 Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
1008 Ok(None) => Ok(none()),
1009 Err(e) => Err(format!("kv.get: {e}")),
1010 }
1011 }
1012 ("kv", "put") => {
1013 let h = expect_kv_handle(args.first())?;
1014 let key = expect_str(args.get(1))?.to_string();
1015 let val = expect_bytes(args.get(2))?.clone();
1016 let mut reg = kv_registry().lock().unwrap();
1017 let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
1018 match db.insert(key.as_bytes(), val) {
1019 Ok(_) => Ok(ok(Value::Unit)),
1020 Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
1021 }
1022 }
1023 ("kv", "delete") => {
1024 let h = expect_kv_handle(args.first())?;
1025 let key = expect_str(args.get(1))?;
1026 let mut reg = kv_registry().lock().unwrap();
1027 let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
1028 match db.remove(key.as_bytes()) {
1029 Ok(_) => Ok(ok(Value::Unit)),
1030 Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
1031 }
1032 }
1033 ("kv", "contains") => {
1034 let h = expect_kv_handle(args.first())?;
1035 let key = expect_str(args.get(1))?;
1036 let mut reg = kv_registry().lock().unwrap();
1037 let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1038 match db.contains_key(key.as_bytes()) {
1039 Ok(present) => Ok(Value::Bool(present)),
1040 Err(e) => Err(format!("kv.contains: {e}")),
1041 }
1042 }
1043 ("kv", "list_prefix") => {
1044 let h = expect_kv_handle(args.first())?;
1045 let prefix = expect_str(args.get(1))?;
1046 let mut reg = kv_registry().lock().unwrap();
1047 let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1048 let mut keys: Vec<Value> = Vec::new();
1049 for kv in db.scan_prefix(prefix.as_bytes()) {
1050 let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1051 let s = String::from_utf8_lossy(&k).to_string();
1052 keys.push(Value::Str(s));
1053 }
1054 Ok(Value::List(keys))
1055 }
1056 ("sql", "open") => {
1057 let path = expect_str(args.first())?.to_string();
1058 if path.starts_with("postgres://") || path.starts_with("postgresql://") {
1059 match postgres::Client::connect(&path, postgres::NoTls) {
1061 Ok(client) => {
1062 let handle = next_sql_handle();
1063 sql_registry().lock().unwrap().insert(handle, SqlConn::Postgres(client));
1064 Ok(ok(Value::Int(handle as i64)))
1065 }
1066 Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1067 }
1068 } else {
1069 if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1072 let p = std::path::Path::new(&path);
1073 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1074 return Ok(err(Value::Str(format!(
1075 "sql.open: `{path}` outside --allow-fs-write"))));
1076 }
1077 }
1078 match rusqlite::Connection::open(&path) {
1079 Ok(conn) => {
1080 let handle = next_sql_handle();
1081 sql_registry().lock().unwrap().insert(handle, SqlConn::Sqlite(conn));
1082 Ok(ok(Value::Int(handle as i64)))
1083 }
1084 Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1085 }
1086 }
1087 }
1088 ("sql", "close") => {
1089 let h = expect_sql_handle(args.first())?;
1090 sql_registry().lock().unwrap().remove(h);
1091 Ok(Value::Unit)
1092 }
1093 ("sql", "exec") => {
1094 let h = expect_sql_handle(args.first())?;
1095 let stmt = expect_str(args.get(1))?.to_string();
1096 let params = expect_sql_params(args.get(2))?;
1097 let arc = sql_registry().lock().unwrap()
1098 .touch_get(h)
1099 .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1100 let mut conn = arc.lock().unwrap();
1101 match &mut *conn {
1102 SqlConn::Sqlite(c) => {
1103 let bound = sqlite_params(¶ms);
1104 let bind: Vec<&dyn rusqlite::ToSql> =
1105 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1106 match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1107 Ok(n) => Ok(ok(Value::Int(n as i64))),
1108 Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1109 }
1110 }
1111 SqlConn::Postgres(c) => {
1112 let pg = pg_param_refs(¶ms);
1113 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1114 pg.iter().map(|b| b.as_ref()).collect();
1115 match c.execute(stmt.as_str(), &refs) {
1116 Ok(n) => Ok(ok(Value::Int(n as i64))),
1117 Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1118 }
1119 }
1120 }
1121 }
1122 ("sql", "query") => {
1123 let h = expect_sql_handle(args.first())?;
1124 let stmt_str = expect_str(args.get(1))?.to_string();
1125 let params = expect_sql_params(args.get(2))?;
1126 let arc = sql_registry().lock().unwrap()
1127 .touch_get(h)
1128 .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1129 let mut conn = arc.lock().unwrap();
1130 Ok(match &mut *conn {
1131 SqlConn::Sqlite(c) => sql_run_query_sqlite(c, &stmt_str, ¶ms),
1132 SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, ¶ms),
1133 })
1134 }
1135 ("sql", "begin") => {
1140 let h = expect_sql_handle(args.first())?;
1141 let arc = sql_registry().lock().unwrap()
1142 .touch_get(h)
1143 .ok_or_else(|| "sql.begin: closed or unknown Db handle".to_string())?;
1144 let mut conn = arc.lock().unwrap();
1145 let res = match &mut *conn {
1146 SqlConn::Sqlite(c) => c.execute_batch("BEGIN").map_err(|e| e.to_string()),
1147 SqlConn::Postgres(c) => c.batch_execute("BEGIN").map_err(|e| e.to_string()),
1148 };
1149 match res {
1150 Ok(()) => Ok(ok(Value::Int(h as i64))),
1151 Err(e) => Ok(err(Value::Str(format!("sql.begin: {e}")))),
1152 }
1153 }
1154 ("sql", "commit") => {
1155 let h = expect_sql_handle(args.first())?;
1156 let arc = sql_registry().lock().unwrap()
1157 .touch_get(h)
1158 .ok_or_else(|| "sql.commit: closed or unknown SqlTx handle".to_string())?;
1159 let mut conn = arc.lock().unwrap();
1160 let res = match &mut *conn {
1161 SqlConn::Sqlite(c) => c.execute_batch("COMMIT").map_err(|e| e.to_string()),
1162 SqlConn::Postgres(c) => c.batch_execute("COMMIT").map_err(|e| e.to_string()),
1163 };
1164 match res {
1165 Ok(()) => Ok(ok(Value::Unit)),
1166 Err(e) => Ok(err(Value::Str(format!("sql.commit: {e}")))),
1167 }
1168 }
1169 ("sql", "rollback") => {
1170 let h = expect_sql_handle(args.first())?;
1171 let arc = sql_registry().lock().unwrap()
1172 .touch_get(h)
1173 .ok_or_else(|| "sql.rollback: closed or unknown SqlTx handle".to_string())?;
1174 let mut conn = arc.lock().unwrap();
1175 let res = match &mut *conn {
1176 SqlConn::Sqlite(c) => c.execute_batch("ROLLBACK").map_err(|e| e.to_string()),
1177 SqlConn::Postgres(c) => c.batch_execute("ROLLBACK").map_err(|e| e.to_string()),
1178 };
1179 match res {
1180 Ok(()) => Ok(ok(Value::Unit)),
1181 Err(e) => Ok(err(Value::Str(format!("sql.rollback: {e}")))),
1182 }
1183 }
1184 ("sql", "exec_tx") => {
1185 let h = expect_sql_handle(args.first())?;
1186 let stmt = expect_str(args.get(1))?.to_string();
1187 let params = expect_sql_params(args.get(2))?;
1188 let arc = sql_registry().lock().unwrap()
1189 .touch_get(h)
1190 .ok_or_else(|| "sql.exec_tx: closed or unknown SqlTx handle".to_string())?;
1191 let mut conn = arc.lock().unwrap();
1192 match &mut *conn {
1193 SqlConn::Sqlite(c) => {
1194 let bound = sqlite_params(¶ms);
1195 let bind: Vec<&dyn rusqlite::ToSql> =
1196 bound.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
1197 match c.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1198 Ok(n) => Ok(ok(Value::Int(n as i64))),
1199 Err(e) => Ok(err(Value::Str(format!("sql.exec_tx: {e}")))),
1200 }
1201 }
1202 SqlConn::Postgres(c) => {
1203 let pg = pg_param_refs(¶ms);
1204 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
1205 pg.iter().map(|b| b.as_ref()).collect();
1206 match c.execute(stmt.as_str(), &refs) {
1207 Ok(n) => Ok(ok(Value::Int(n as i64))),
1208 Err(e) => Ok(err(Value::Str(format!("sql.exec_tx: {e}")))),
1209 }
1210 }
1211 }
1212 }
1213 ("sql", "query_tx") => {
1214 let h = expect_sql_handle(args.first())?;
1215 let stmt_str = expect_str(args.get(1))?.to_string();
1216 let params = expect_sql_params(args.get(2))?;
1217 let arc = sql_registry().lock().unwrap()
1218 .touch_get(h)
1219 .ok_or_else(|| "sql.query_tx: closed or unknown SqlTx handle".to_string())?;
1220 let mut conn = arc.lock().unwrap();
1221 Ok(match &mut *conn {
1222 SqlConn::Sqlite(c) => sql_run_query_sqlite(c, &stmt_str, ¶ms),
1223 SqlConn::Postgres(c) => sql_run_query_pg(c, &stmt_str, ¶ms),
1224 })
1225 }
1226 ("sql", "get_str") => Ok(sql_get_col(&args, |v| match v {
1227 Value::Str(s) => Some(Value::Str(s.clone())),
1228 Value::Int(n) => Some(Value::Str(n.to_string())),
1229 _ => None,
1230 })?),
1231 ("sql", "get_int") => Ok(sql_get_col(&args, |v| match v {
1232 Value::Int(n) => Some(Value::Int(*n)),
1233 Value::Float(f) => Some(Value::Int(*f as i64)),
1234 _ => None,
1235 })?),
1236 ("sql", "get_float") => Ok(sql_get_col(&args, |v| match v {
1237 Value::Float(f) => Some(Value::Float(*f)),
1238 Value::Int(n) => Some(Value::Float(*n as f64)),
1239 _ => None,
1240 })?),
1241 ("sql", "get_bool") => Ok(sql_get_col(&args, |v| match v {
1242 Value::Bool(b) => Some(Value::Bool(*b)),
1243 Value::Int(n) => Some(Value::Bool(*n != 0)),
1244 _ => None,
1245 })?),
1246 ("proc", "spawn") => {
1247 let cmd = expect_str(args.first())?.to_string();
1261 let raw_args = match args.get(1) {
1262 Some(Value::List(items)) => items,
1263 Some(other) => return Err(format!(
1264 "proc.spawn: args must be List[Str], got {other:?}")),
1265 None => return Err("proc.spawn: missing args list".into()),
1266 };
1267 let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1268 Value::Str(s) => Ok(s.clone()),
1269 other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1270 }).collect::<Result<Vec<_>, _>>()?;
1271
1272 if !self.policy.allow_proc.is_empty() {
1276 let basename = std::path::Path::new(&cmd)
1277 .file_name()
1278 .and_then(|s| s.to_str())
1279 .unwrap_or(&cmd);
1280 if !self.policy.allow_proc.iter().any(|a| a == basename) {
1281 return Ok(err(Value::Str(format!(
1282 "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1283 self.policy.allow_proc
1284 ))));
1285 }
1286 }
1287
1288 if str_args.len() > 1024 {
1291 return Ok(err(Value::Str(
1292 "proc.spawn: arg-count exceeds 1024".into())));
1293 }
1294 if str_args.iter().any(|a| a.len() > 65_536) {
1295 return Ok(err(Value::Str(
1296 "proc.spawn: per-arg length exceeds 64 KiB".into())));
1297 }
1298
1299 let output = std::process::Command::new(&cmd)
1300 .args(&str_args)
1301 .output();
1302 match output {
1303 Ok(o) => {
1304 let mut rec = indexmap::IndexMap::new();
1305 rec.insert("stdout".into(), Value::Str(
1306 String::from_utf8_lossy(&o.stdout).to_string()));
1307 rec.insert("stderr".into(), Value::Str(
1308 String::from_utf8_lossy(&o.stderr).to_string()));
1309 rec.insert("exit_code".into(), Value::Int(
1310 o.status.code().unwrap_or(-1) as i64));
1311 Ok(ok(Value::Record(rec)))
1312 }
1313 Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1314 }
1315 }
1316 other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1317 }
1318 }
1319
1320 fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1343 let mut fresh = DefaultHandler::new(self.policy.clone());
1344 fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1347 fresh.budget_ceiling = self.budget_ceiling;
1348 fresh.read_root = self.read_root.clone();
1349 fresh.program = self.program.clone();
1350 fresh.chat_registry = self.chat_registry.clone();
1351 fresh.streams = std::sync::Arc::clone(&self.streams);
1356 fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1357 Some(Box::new(fresh))
1358 }
1359}
1360
1361pub struct TlsConfig {
1371 pub cert: Vec<u8>,
1372 pub key: Vec<u8>,
1373}
1374
1375fn serve_http(
1376 port: u16,
1377 handler_name: String,
1378 program: Arc<Program>,
1379 policy: Policy,
1380 tls: Option<TlsConfig>,
1381) -> Result<Value, String> {
1382 let (server, scheme) = match tls {
1383 None => (
1384 tiny_http::Server::http(("127.0.0.1", port))
1385 .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1386 "http",
1387 ),
1388 Some(cfg) => {
1389 let ssl = tiny_http::SslConfig {
1390 certificate: cfg.cert,
1391 private_key: cfg.key,
1392 };
1393 (
1394 tiny_http::Server::https(("127.0.0.1", port), ssl)
1395 .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1396 "https",
1397 )
1398 }
1399 };
1400 eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1401 for req in server.incoming_requests() {
1407 let program = Arc::clone(&program);
1408 let policy = policy.clone();
1409 let handler_name = handler_name.clone();
1410 std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1411 }
1412 Ok(Value::Unit)
1413}
1414
1415fn handle_request(
1416 mut req: tiny_http::Request,
1417 program: Arc<Program>,
1418 policy: Policy,
1419 handler_name: String,
1420) {
1421 let lex_req = build_request_value(&mut req);
1422 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1423 let mut vm = Vm::with_handler(&program, Box::new(handler));
1424 match vm.call(&handler_name, vec![lex_req]) {
1425 Ok(resp) => {
1426 let (status, body, headers) = unpack_response(&resp);
1427 let mut response = tiny_http::Response::from_string(body).with_status_code(status);
1428 for h in headers {
1429 response.add_header(h);
1430 }
1431 let _ = req.respond(response);
1432 }
1433 Err(e) => {
1434 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1435 .with_status_code(500);
1436 let _ = req.respond(response);
1437 }
1438 }
1439}
1440
1441fn serve_http_fn(
1442 port: u16,
1443 closure: Value,
1444 program: Arc<Program>,
1445 policy: Policy,
1446) -> Result<Value, String> {
1447 let server = tiny_http::Server::http(("127.0.0.1", port))
1448 .map_err(|e| format!("net.serve_fn bind {port}: {e}"))?;
1449 eprintln!("net.serve_fn: listening on http://127.0.0.1:{port}");
1450 for req in server.incoming_requests() {
1451 let program = Arc::clone(&program);
1452 let policy = policy.clone();
1453 let closure = closure.clone();
1454 std::thread::spawn(move || handle_request_fn(req, program, policy, closure));
1455 }
1456 Ok(Value::Unit)
1457}
1458
1459fn handle_request_fn(
1460 mut req: tiny_http::Request,
1461 program: Arc<Program>,
1462 policy: Policy,
1463 closure: Value,
1464) {
1465 let lex_req = build_request_value(&mut req);
1466 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1467 let mut vm = Vm::with_handler(&program, Box::new(handler));
1468 match vm.invoke_closure_value(closure, vec![lex_req]) {
1469 Ok(resp) => {
1470 let (status, body, headers) = unpack_response(&resp);
1471 let mut response = tiny_http::Response::from_string(body).with_status_code(status);
1472 for h in headers {
1473 response.add_header(h);
1474 }
1475 let _ = req.respond(response);
1476 }
1477 Err(e) => {
1478 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1479 .with_status_code(500);
1480 let _ = req.respond(response);
1481 }
1482 }
1483}
1484
1485fn build_request_value(req: &mut tiny_http::Request) -> Value {
1486 let method = format!("{:?}", req.method()).to_uppercase();
1487 let url = req.url().to_string();
1488 let (path, query) = match url.split_once('?') {
1489 Some((p, q)) => (p.to_string(), q.to_string()),
1490 None => (url, String::new()),
1491 };
1492 let mut headers_map = std::collections::BTreeMap::new();
1493 for h in req.headers() {
1494 headers_map.insert(
1495 lex_bytecode::MapKey::Str(h.field.as_str().as_str().to_ascii_lowercase()),
1496 Value::Str(h.value.as_str().to_string()),
1497 );
1498 }
1499 let mut body = String::new();
1500 let _ = req.as_reader().read_to_string(&mut body);
1501 let mut rec = indexmap::IndexMap::new();
1502 rec.insert("method".into(), Value::Str(method));
1503 rec.insert("path".into(), Value::Str(path));
1504 rec.insert("query".into(), Value::Str(query));
1505 rec.insert("body".into(), Value::Str(body));
1506 rec.insert("headers".into(), Value::Map(headers_map));
1507 Value::Record(rec)
1508}
1509
1510fn unpack_response(v: &Value) -> (u16, String, Vec<tiny_http::Header>) {
1511 if let Value::Record(rec) = v {
1512 let status = rec.get("status").and_then(|s| match s {
1513 Value::Int(n) => Some(*n as u16),
1514 _ => None,
1515 }).unwrap_or(200);
1516 let body = rec.get("body").and_then(|b| match b {
1517 Value::Str(s) => Some(s.clone()),
1518 _ => None,
1519 }).unwrap_or_default();
1520 let headers = if let Some(Value::Map(hmap)) = rec.get("headers") {
1521 hmap.iter().filter_map(|(k, v)| {
1522 if let (lex_bytecode::MapKey::Str(name), Value::Str(val)) = (k, v) {
1523 format!("{name}: {val}").parse::<tiny_http::Header>().ok()
1524 } else {
1525 None
1526 }
1527 }).collect()
1528 } else {
1529 vec![]
1530 };
1531 return (status, body, headers);
1532 }
1533 (500, format!("handler returned non-record: {v:?}"), vec![])
1534}
1535
1536fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1542 use std::time::Duration;
1543 let agent: ureq::Agent = ureq::Agent::config_builder()
1548 .timeout_connect(Some(Duration::from_secs(10)))
1549 .timeout_recv_body(Some(Duration::from_secs(30)))
1550 .timeout_send_body(Some(Duration::from_secs(10)))
1551 .http_status_as_error(false)
1552 .build()
1553 .into();
1554 let resp = match (method, body) {
1555 ("GET", _) => agent.get(url).call(),
1556 ("POST", Some(b)) => agent.post(url).send(b),
1557 ("POST", None) => agent.post(url).send(""),
1558 (m, _) => return err_value(format!("unsupported method: {m}")),
1559 };
1560 match resp {
1561 Ok(mut r) => {
1562 let status = r.status().as_u16();
1563 let body = r.body_mut().read_to_string().unwrap_or_default();
1564 if (200..300).contains(&status) {
1565 Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1566 } else {
1567 err_value(format!("status {status}: {body}"))
1568 }
1569 }
1570 Err(e) => err_value(format!("transport: {e}")),
1571 }
1572}
1573
1574fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1579 use std::time::Duration;
1580 let mut b = ureq::Agent::config_builder()
1581 .timeout_connect(Some(Duration::from_secs(10)))
1582 .timeout_recv_body(Some(Duration::from_secs(30)))
1583 .timeout_send_body(Some(Duration::from_secs(10)))
1584 .http_status_as_error(false);
1585 if let Some(ms) = timeout_ms {
1586 let d = Duration::from_millis(ms);
1587 b = b.timeout_global(Some(d));
1588 }
1589 b.build().into()
1590}
1591
1592fn http_error_value(e: ureq::Error) -> Value {
1596 let (ctor, payload): (&str, Option<String>) = match &e {
1597 ureq::Error::Timeout(_) => ("TimeoutError", None),
1598 ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1599 ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1600 ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1601 _ => ("NetworkError", Some(format!("{e}"))),
1602 };
1603 let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1604 let inner = Value::Variant { name: ctor.into(), args };
1605 Value::Variant { name: "Err".into(), args: vec![inner] }
1606}
1607
1608fn http_decode_err(msg: String) -> Value {
1609 let inner = Value::Variant {
1610 name: "DecodeError".into(),
1611 args: vec![Value::Str(msg)],
1612 };
1613 Value::Variant { name: "Err".into(), args: vec![inner] }
1614}
1615
1616fn http_send_simple(
1621 method: &str,
1622 url: &str,
1623 body: Option<Vec<u8>>,
1624 content_type: &str,
1625 timeout_ms: Option<u64>,
1626) -> Value {
1627 http_send_full(method, url, body, content_type, &[], timeout_ms)
1628}
1629
1630fn http_send_full(
1631 method: &str,
1632 url: &str,
1633 body: Option<Vec<u8>>,
1634 content_type: &str,
1635 headers: &[(String, String)],
1636 timeout_ms: Option<u64>,
1637) -> Value {
1638 let agent = http_agent(timeout_ms);
1639 let resp = match method {
1640 "GET" => {
1641 let mut req = agent.get(url);
1642 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1643 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1644 req.call()
1645 }
1646 "POST" => {
1647 let body = body.unwrap_or_default();
1648 let mut req = agent.post(url);
1649 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1650 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1651 req.send(&body[..])
1652 }
1653 m => {
1654 return http_decode_err(format!("unsupported method: {m}"));
1658 }
1659 };
1660 match resp {
1661 Ok(mut r) => {
1662 let status = r.status().as_u16() as i64;
1663 let headers_map = collect_response_headers(r.headers());
1664 let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1665 Ok(b) => b,
1666 Err(e) => return http_decode_err(format!("body read: {e}")),
1667 };
1668 let mut rec = indexmap::IndexMap::new();
1669 rec.insert("status".into(), Value::Int(status));
1670 rec.insert("headers".into(), Value::Map(headers_map));
1671 rec.insert("body".into(), Value::Bytes(body_bytes));
1672 Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1673 }
1674 Err(e) => http_error_value(e),
1675 }
1676}
1677
1678fn collect_response_headers(
1679 headers: &ureq::http::HeaderMap,
1680) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1681 let mut out = std::collections::BTreeMap::new();
1682 for (name, value) in headers.iter() {
1683 let v = value.to_str().unwrap_or("").to_string();
1684 out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1685 }
1686 out
1687}
1688
1689fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1693 let method = match req.get("method") {
1694 Some(Value::Str(s)) => s.clone(),
1695 _ => return http_decode_err("HttpRequest.method must be Str".into()),
1696 };
1697 let url = match req.get("url") {
1698 Some(Value::Str(s)) => s.clone(),
1699 _ => return http_decode_err("HttpRequest.url must be Str".into()),
1700 };
1701 if let Err(e) = handler.ensure_host_allowed(&url) {
1702 return http_decode_err(e);
1703 }
1704 let body = match req.get("body") {
1705 Some(Value::Variant { name, args }) if name == "None" => None,
1706 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1707 [Value::Bytes(b)] => Some(b.clone()),
1708 _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1709 },
1710 _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1711 };
1712 let timeout_ms = match req.get("timeout_ms") {
1713 Some(Value::Variant { name, .. }) if name == "None" => None,
1714 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1715 [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1716 _ => return http_decode_err(
1717 "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1718 },
1719 _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1720 };
1721 let headers: Vec<(String, String)> = match req.get("headers") {
1722 Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1723 let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1724 let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1725 Some((kk, vv))
1726 }).collect(),
1727 _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1728 };
1729 http_send_full(&method, &url, body, "", &headers, timeout_ms)
1730}
1731
1732fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1733 match v {
1734 Some(Value::Record(r)) => Ok(r),
1735 Some(other) => Err(format!("expected Record, got {other:?}")),
1736 None => Err("missing Record argument".into()),
1737 }
1738}
1739
1740fn err_value(msg: String) -> Value {
1741 Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1742}
1743
1744fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1745 match v {
1746 Some(Value::Str(s)) => Ok(s),
1747 Some(other) => Err(format!("expected Str arg, got {other:?}")),
1748 None => Err("missing argument".into()),
1749 }
1750}
1751
1752fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1753 match v {
1754 Some(Value::Int(n)) => Ok(*n),
1755 Some(other) => Err(format!("expected Int arg, got {other:?}")),
1756 None => Err("missing argument".into()),
1757 }
1758}
1759
1760fn ok(v: Value) -> Value {
1761 Value::Variant { name: "Ok".into(), args: vec![v] }
1762}
1763fn err(v: Value) -> Value {
1764 Value::Variant { name: "Err".into(), args: vec![v] }
1765}
1766
1767impl DefaultHandler {
1768 fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1774 let server = match args.first() {
1775 Some(Value::Str(s)) => s.clone(),
1776 _ => return err(Value::Str(
1777 "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1778 };
1779 let tool = match args.get(1) {
1780 Some(Value::Str(s)) => s.clone(),
1781 _ => return err(Value::Str(
1782 "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1783 };
1784 let args_json = match args.get(2) {
1785 Some(Value::Str(s)) => s.clone(),
1786 _ => return err(Value::Str(
1787 "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1788 };
1789 let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1790 Ok(v) => v,
1791 Err(e) => return err(Value::Str(format!(
1792 "agent.call_mcp: args_json is not valid JSON: {e}"))),
1793 };
1794 match self.mcp_clients.call(&server, &tool, parsed) {
1795 Ok(result) => ok(Value::Str(
1796 serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1797 Err(e) => err(Value::Str(e)),
1798 }
1799 }
1800
1801 fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
1807 let _prompt = match args.first() {
1808 Some(Value::Str(s)) => s.clone(),
1809 _ => return err(Value::Str(
1810 "agent.cloud_stream(prompt): prompt must be Str".into())),
1811 };
1812 let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
1813 Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
1814 Err(_) => return err(Value::Str(
1815 "agent.cloud_stream: live streaming not yet implemented; \
1816 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
1817 };
1818 let handle = self.register_stream(chunks.into_iter());
1819 ok(stream_handle_value(handle))
1820 }
1821
1822 fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
1828 let handle = match args.first().and_then(stream_handle_id) {
1829 Some(h) => h,
1830 None => return Value::Variant { name: "None".into(), args: vec![] },
1831 };
1832 let mut streams = match self.streams.lock() {
1833 Ok(g) => g,
1834 Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
1835 };
1836 match streams.get_mut(&handle).and_then(|it| it.next()) {
1837 Some(chunk) => some(Value::Str(chunk)),
1838 None => {
1839 streams.remove(&handle);
1840 Value::Variant { name: "None".into(), args: vec![] }
1841 }
1842 }
1843 }
1844
1845 fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
1850 let handle = match args.first().and_then(stream_handle_id) {
1851 Some(h) => h,
1852 None => return Value::List(Vec::new()),
1853 };
1854 let mut iter = {
1855 let mut streams = match self.streams.lock() {
1856 Ok(g) => g,
1857 Err(_) => return Value::List(Vec::new()),
1858 };
1859 match streams.remove(&handle) {
1860 Some(it) => it,
1861 None => return Value::List(Vec::new()),
1862 }
1863 };
1864 let mut out: Vec<Value> = Vec::new();
1865 for chunk in iter.by_ref() {
1866 out.push(Value::Str(chunk));
1867 }
1868 Value::List(out)
1869 }
1870
1871 fn register_stream<I>(&self, iter: I) -> String
1875 where
1876 I: Iterator<Item = String> + Send + 'static,
1877 {
1878 let id = self
1879 .next_stream_id
1880 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1881 let handle = format!("stream_{id}");
1882 if let Ok(mut streams) = self.streams.lock() {
1883 streams.insert(handle.clone(), Box::new(iter));
1884 }
1885 handle
1886 }
1887}
1888
1889fn stream_handle_value(handle: String) -> Value {
1894 Value::Variant {
1895 name: "__StreamHandle".into(),
1896 args: vec![Value::Str(handle)],
1897 }
1898}
1899
1900fn stream_handle_id(v: &Value) -> Option<String> {
1904 match v {
1905 Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
1906 Some(Value::Str(h)) => Some(h.clone()),
1907 _ => None,
1908 },
1909 _ => None,
1910 }
1911}
1912
1913fn dispatch_llm_local(args: Vec<Value>) -> Value {
1918 let prompt = match args.first() {
1919 Some(Value::Str(s)) => s.clone(),
1920 _ => return err(Value::Str(
1921 "agent.local_complete(prompt): prompt must be Str".into())),
1922 };
1923 match crate::llm::local_complete(&prompt) {
1924 Ok(text) => ok(Value::Str(text)),
1925 Err(e) => err(Value::Str(e)),
1926 }
1927}
1928
1929fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1936 let prompt = match args.first() {
1937 Some(Value::Str(s)) => s.clone(),
1938 _ => return err(Value::Str(
1939 "agent.cloud_complete(prompt): prompt must be Str".into())),
1940 };
1941 match crate::llm::cloud_complete(&prompt) {
1942 Ok(text) => ok(Value::Str(text)),
1943 Err(e) => err(Value::Str(e)),
1944 }
1945}
1946
1947fn some(v: Value) -> Value {
1948 Value::Variant { name: "Some".into(), args: vec![v] }
1949}
1950fn none() -> Value {
1951 Value::Variant { name: "None".into(), args: vec![] }
1952}
1953
1954fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1955 match v {
1956 Some(Value::Bytes(b)) => Ok(b),
1957 Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1958 None => Err("missing argument".into()),
1959 }
1960}
1961
1962fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1963 match v {
1964 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1965 Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1966 None => Err("missing Kv argument".into()),
1967 }
1968}
1969
1970fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1971 match v {
1972 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1973 Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1974 None => Err("missing Db argument".into()),
1975 }
1976}
1977
1978#[allow(dead_code)]
1979fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1980 match v {
1981 Some(Value::List(items)) => items.iter().map(|x| match x {
1982 Value::Str(s) => Ok(s.clone()),
1983 other => Err(format!("expected List[Str] element, got {other:?}")),
1984 }).collect(),
1985 Some(other) => Err(format!("expected List[Str], got {other:?}")),
1986 None => Err("missing List[Str] argument".into()),
1987 }
1988}
1989
1990fn expect_sql_params(v: Option<&Value>) -> Result<Vec<SqlParamValue>, String> {
1993 let items = match v {
1994 Some(Value::List(xs)) => xs,
1995 Some(other) => return Err(format!("expected List[SqlParam], got {other:?}")),
1996 None => return Err("missing params argument".into()),
1997 };
1998 items.iter().map(|item| {
1999 match item {
2000 Value::Variant { name, args } => match name.as_str() {
2001 "PStr" => match args.first() {
2002 Some(Value::Str(s)) => Ok(SqlParamValue::Text(s.clone())),
2003 _ => Err("PStr requires a Str argument".into()),
2004 },
2005 "PInt" => match args.first() {
2006 Some(Value::Int(n)) => Ok(SqlParamValue::Integer(*n)),
2007 _ => Err("PInt requires an Int argument".into()),
2008 },
2009 "PFloat" => match args.first() {
2010 Some(Value::Float(f)) => Ok(SqlParamValue::Real(*f)),
2011 _ => Err("PFloat requires a Float argument".into()),
2012 },
2013 "PBool" => match args.first() {
2014 Some(Value::Bool(b)) => Ok(SqlParamValue::Bool(*b)),
2015 _ => Err("PBool requires a Bool argument".into()),
2016 },
2017 "PNull" => Ok(SqlParamValue::Null),
2018 other => Err(format!("unknown SqlParam constructor `{other}`")),
2019 },
2020 Value::Str(s) => Ok(SqlParamValue::Text(s.clone())),
2022 other => Err(format!("expected SqlParam variant, got {other:?}")),
2023 }
2024 }).collect()
2025}
2026
2027fn sqlite_params(params: &[SqlParamValue]) -> Vec<rusqlite::types::Value> {
2029 params.iter().map(|p| match p {
2030 SqlParamValue::Text(s) => rusqlite::types::Value::Text(s.clone()),
2031 SqlParamValue::Integer(n) => rusqlite::types::Value::Integer(*n),
2032 SqlParamValue::Real(f) => rusqlite::types::Value::Real(*f),
2033 SqlParamValue::Bool(b) => rusqlite::types::Value::Integer(*b as i64),
2034 SqlParamValue::Null => rusqlite::types::Value::Null,
2035 }).collect()
2036}
2037
2038fn pg_param_refs(params: &[SqlParamValue]) -> Vec<Box<dyn postgres::types::ToSql + Sync>> {
2040 params.iter().map(|p| -> Box<dyn postgres::types::ToSql + Sync> {
2041 match p {
2042 SqlParamValue::Text(s) => Box::new(s.clone()),
2043 SqlParamValue::Integer(n) => Box::new(*n),
2044 SqlParamValue::Real(f) => Box::new(*f),
2045 SqlParamValue::Bool(b) => Box::new(*b),
2046 SqlParamValue::Null => Box::new(Option::<String>::None),
2047 }
2048 }).collect()
2049}
2050
2051fn sql_run_query_sqlite(
2053 conn: &rusqlite::Connection,
2054 stmt_str: &str,
2055 params: &[SqlParamValue],
2056) -> Value {
2057 let mut stmt = match conn.prepare(stmt_str) {
2058 Ok(s) => s,
2059 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
2060 };
2061 let column_count = stmt.column_count();
2062 let column_names: Vec<String> = (0..column_count)
2063 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
2064 .collect();
2065 let bound = sqlite_params(params);
2066 let bind: Vec<&dyn rusqlite::ToSql> = bound.iter()
2067 .map(|p| p as &dyn rusqlite::ToSql)
2068 .collect();
2069 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
2070 Ok(r) => r,
2071 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
2072 };
2073 let mut out: Vec<Value> = Vec::new();
2074 loop {
2075 let row = match rows.next() {
2076 Ok(Some(r)) => r,
2077 Ok(None) => break,
2078 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
2079 };
2080 let mut rec = indexmap::IndexMap::new();
2081 for (i, name) in column_names.iter().enumerate() {
2082 let cell = match row.get_ref(i) {
2083 Ok(c) => sql_value_ref_to_lex(c),
2084 Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
2085 };
2086 rec.insert(name.clone(), cell);
2087 }
2088 out.push(Value::Record(rec));
2089 }
2090 ok(Value::List(out))
2091}
2092
2093fn sql_run_query_pg(
2095 client: &mut postgres::Client,
2096 stmt_str: &str,
2097 params: &[SqlParamValue],
2098) -> Value {
2099 let pg = pg_param_refs(params);
2100 let refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
2101 pg.iter().map(|b| b.as_ref()).collect();
2102 let rows = match client.query(stmt_str, &refs) {
2103 Ok(r) => r,
2104 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
2105 };
2106 let out: Vec<Value> = rows.iter().map(|row| {
2107 Value::Record(pg_row_to_lex_record(row))
2108 }).collect();
2109 ok(Value::List(out))
2110}
2111
2112fn pg_row_to_lex_record(row: &postgres::Row) -> indexmap::IndexMap<String, Value> {
2114 use postgres::types::Type;
2115 let mut rec = indexmap::IndexMap::new();
2116 for (i, col) in row.columns().iter().enumerate() {
2117 let ty = col.type_();
2118 let val = if *ty == Type::INT2 || *ty == Type::INT4 || *ty == Type::INT8 {
2119 row.get::<_, Option<i64>>(i).map(Value::Int).unwrap_or(Value::Unit)
2120 } else if *ty == Type::FLOAT4 || *ty == Type::FLOAT8 {
2121 row.get::<_, Option<f64>>(i).map(Value::Float).unwrap_or(Value::Unit)
2122 } else if *ty == Type::BOOL {
2123 row.get::<_, Option<bool>>(i).map(Value::Bool).unwrap_or(Value::Unit)
2124 } else if *ty == Type::BYTEA {
2125 row.get::<_, Option<Vec<u8>>>(i).map(Value::Bytes).unwrap_or(Value::Unit)
2126 } else {
2127 row.get::<_, Option<String>>(i).map(Value::Str).unwrap_or(Value::Unit)
2128 };
2129 rec.insert(col.name().to_string(), val);
2130 }
2131 rec
2132}
2133
2134fn sql_get_col<F>(args: &[Value], convert: F) -> Result<Value, String>
2136where
2137 F: Fn(&Value) -> Option<Value>,
2138{
2139 let row = args.first().ok_or("sql.get_*: missing row argument")?;
2140 let col = match args.get(1) {
2141 Some(Value::Str(s)) => s.as_str(),
2142 Some(other) => return Err(format!("sql.get_*: column name must be Str, got {other:?}")),
2143 None => return Err("sql.get_*: missing column name argument".into()),
2144 };
2145 let cell = match row {
2146 Value::Record(rec) => rec.get(col).cloned(),
2147 other => return Err(format!("sql.get_*: row must be a Record, got {other:?}")),
2148 };
2149 Ok(match cell.and_then(|v| convert(&v)) {
2150 Some(v) => Value::Variant { name: "Some".into(), args: vec![v] },
2151 None => Value::Variant { name: "None".into(), args: vec![] },
2152 })
2153}
2154
2155fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
2156 use rusqlite::types::ValueRef;
2157 match v {
2158 ValueRef::Null => Value::Unit,
2159 ValueRef::Integer(n) => Value::Int(n),
2160 ValueRef::Real(f) => Value::Float(f),
2161 ValueRef::Text(s) => Value::Str(String::from_utf8_lossy(s).into_owned()),
2162 ValueRef::Blob(b) => Value::Bytes(b.to_vec()),
2163 }
2164}
2165
2166#[derive(Clone, Copy, PartialEq, PartialOrd)]
2169enum LogLevel { Debug, Info, Warn, Error }
2170
2171#[derive(Clone, Copy, PartialEq)]
2172enum LogFormat { Text, Json }
2173
2174#[derive(Clone)]
2175enum LogSink {
2176 Stderr,
2177 File(std::sync::Arc<Mutex<std::fs::File>>),
2178}
2179
2180struct LogState {
2181 level: LogLevel,
2182 format: LogFormat,
2183 sink: LogSink,
2184}
2185
2186fn log_state() -> &'static Mutex<LogState> {
2187 static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
2188 STATE.get_or_init(|| Mutex::new(LogState {
2189 level: LogLevel::Info,
2190 format: LogFormat::Text,
2191 sink: LogSink::Stderr,
2192 }))
2193}
2194
2195fn parse_log_level(s: &str) -> Option<LogLevel> {
2196 match s {
2197 "debug" => Some(LogLevel::Debug),
2198 "info" => Some(LogLevel::Info),
2199 "warn" => Some(LogLevel::Warn),
2200 "error" => Some(LogLevel::Error),
2201 _ => None,
2202 }
2203}
2204
2205fn level_label(l: LogLevel) -> &'static str {
2206 match l {
2207 LogLevel::Debug => "debug",
2208 LogLevel::Info => "info",
2209 LogLevel::Warn => "warn",
2210 LogLevel::Error => "error",
2211 }
2212}
2213
2214fn emit_log(level: LogLevel, msg: &str) {
2215 let state = log_state().lock().unwrap();
2216 if level < state.level {
2217 return;
2218 }
2219 let ts = chrono::Utc::now().to_rfc3339();
2220 let line = match state.format {
2221 LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
2222 LogFormat::Json => {
2223 let escaped = msg
2227 .replace('\\', "\\\\")
2228 .replace('"', "\\\"")
2229 .replace('\n', "\\n")
2230 .replace('\r', "\\r");
2231 format!(
2232 "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
2233 level_label(level),
2234 )
2235 }
2236 };
2237 let sink = state.sink.clone();
2238 drop(state);
2239 match sink {
2240 LogSink::Stderr => {
2241 use std::io::Write;
2242 let _ = std::io::stderr().write_all(line.as_bytes());
2243 }
2244 LogSink::File(f) => {
2245 use std::io::Write;
2246 if let Ok(mut g) = f.lock() {
2247 let _ = g.write_all(line.as_bytes());
2248 }
2249 }
2250 }
2251}
2252
2253pub(crate) struct ProcessState {
2254 child: std::process::Child,
2255 stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
2256 stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
2257}
2258
2259fn process_registry() -> &'static Mutex<ProcessRegistry> {
2273 static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
2274 REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
2275}
2276
2277const MAX_PROCESS_HANDLES: usize = 256;
2278
2279type SharedProcessState = Arc<Mutex<ProcessState>>;
2280
2281pub(crate) struct ProcessRegistry {
2282 entries: indexmap::IndexMap<u64, SharedProcessState>,
2283 cap: usize,
2284}
2285
2286impl ProcessRegistry {
2287 pub(crate) fn with_capacity(cap: usize) -> Self {
2288 Self { entries: indexmap::IndexMap::new(), cap }
2289 }
2290
2291 pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
2295 if self.entries.len() >= self.cap {
2296 self.entries.shift_remove_index(0);
2297 }
2298 self.entries.insert(handle, Arc::new(Mutex::new(state)));
2299 }
2300
2301 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
2305 let idx = self.entries.get_index_of(&handle)?;
2306 self.entries.move_index(idx, self.entries.len() - 1);
2307 self.entries.get(&handle).cloned()
2308 }
2309
2310 pub(crate) fn remove(&mut self, handle: u64) {
2315 self.entries.shift_remove(&handle);
2316 }
2317
2318 #[cfg(test)]
2319 pub(crate) fn len(&self) -> usize { self.entries.len() }
2320}
2321
2322fn next_process_handle() -> u64 {
2323 static COUNTER: AtomicU64 = AtomicU64::new(1);
2324 COUNTER.fetch_add(1, Ordering::SeqCst)
2325}
2326
2327#[cfg(all(test, unix))]
2328mod process_registry_tests {
2329 use super::{ProcessRegistry, ProcessState};
2330
2331 fn fresh_state() -> ProcessState {
2335 let child = std::process::Command::new("true")
2336 .stdout(std::process::Stdio::null())
2337 .stderr(std::process::Stdio::null())
2338 .spawn()
2339 .expect("spawn `true`");
2340 ProcessState { child, stdout: None, stderr: None }
2341 }
2342
2343 #[test]
2344 fn insert_and_get_round_trip() {
2345 let mut r = ProcessRegistry::with_capacity(4);
2346 r.insert(1, fresh_state());
2347 assert!(r.touch_get(1).is_some());
2348 assert!(r.touch_get(2).is_none());
2349 }
2350
2351 #[test]
2352 fn touch_get_returns_distinct_arcs_for_distinct_handles() {
2353 let mut r = ProcessRegistry::with_capacity(4);
2354 r.insert(1, fresh_state());
2355 r.insert(2, fresh_state());
2356 let a = r.touch_get(1).unwrap();
2357 let b = r.touch_get(2).unwrap();
2358 assert!(!std::sync::Arc::ptr_eq(&a, &b));
2360 }
2361
2362 #[test]
2363 fn cap_evicts_lru_on_overflow() {
2364 let mut r = ProcessRegistry::with_capacity(2);
2365 r.insert(1, fresh_state());
2366 r.insert(2, fresh_state());
2367 let _ = r.touch_get(1);
2368 r.insert(3, fresh_state());
2369 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2370 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2371 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2372 assert_eq!(r.len(), 2);
2373 }
2374
2375 #[test]
2376 fn cap_with_no_touches_evicts_in_insertion_order() {
2377 let mut r = ProcessRegistry::with_capacity(2);
2378 r.insert(10, fresh_state());
2379 r.insert(20, fresh_state());
2380 r.insert(30, fresh_state());
2381 assert!(r.touch_get(10).is_none());
2382 assert!(r.touch_get(20).is_some());
2383 assert!(r.touch_get(30).is_some());
2384 }
2385
2386 #[test]
2387 fn remove_drops_entry() {
2388 let mut r = ProcessRegistry::with_capacity(4);
2389 r.insert(1, fresh_state());
2390 r.remove(1);
2391 assert!(r.touch_get(1).is_none());
2392 assert_eq!(r.len(), 0);
2393 }
2394
2395 #[test]
2396 fn many_inserts_stay_bounded_at_cap() {
2397 let cap = 8;
2398 let mut r = ProcessRegistry::with_capacity(cap);
2399 for i in 0..(cap as u64 * 3) {
2400 r.insert(i, fresh_state());
2401 assert!(r.len() <= cap);
2402 }
2403 assert_eq!(r.len(), cap);
2404 }
2405
2406 #[test]
2407 fn outstanding_arc_outlives_remove() {
2408 let mut r = ProcessRegistry::with_capacity(4);
2412 r.insert(1, fresh_state());
2413 let arc = r.touch_get(1).expect("entry exists");
2414 r.remove(1);
2415 assert!(r.touch_get(1).is_none());
2417 let _state = arc.lock().unwrap();
2418 }
2419}
2420
2421fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
2422 match v {
2423 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2424 Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
2425 None => Err("missing ProcessHandle argument".into()),
2426 }
2427}
2428
2429fn kv_registry() -> &'static Mutex<KvRegistry> {
2441 static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
2442 REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
2443}
2444
2445const MAX_KV_HANDLES: usize = 256;
2451
2452pub(crate) struct KvRegistry {
2457 entries: indexmap::IndexMap<u64, sled::Db>,
2458 cap: usize,
2459}
2460
2461impl KvRegistry {
2462 pub(crate) fn with_capacity(cap: usize) -> Self {
2463 Self { entries: indexmap::IndexMap::new(), cap }
2464 }
2465
2466 pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
2469 if self.entries.len() >= self.cap {
2470 self.entries.shift_remove_index(0);
2471 }
2472 self.entries.insert(handle, db);
2473 }
2474
2475 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
2477 let idx = self.entries.get_index_of(&handle)?;
2478 self.entries.move_index(idx, self.entries.len() - 1);
2479 self.entries.get(&handle)
2480 }
2481
2482 pub(crate) fn remove(&mut self, handle: u64) {
2484 self.entries.shift_remove(&handle);
2485 }
2486
2487 #[cfg(test)]
2488 pub(crate) fn len(&self) -> usize { self.entries.len() }
2489}
2490
2491fn next_kv_handle() -> u64 {
2492 static COUNTER: AtomicU64 = AtomicU64::new(1);
2493 COUNTER.fetch_add(1, Ordering::SeqCst)
2494}
2495
2496fn sql_registry() -> &'static Mutex<SqlRegistry> {
2503 static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
2504 REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
2505}
2506
2507const MAX_SQL_HANDLES: usize = 256;
2508
2509#[derive(Debug, Clone)]
2511enum SqlParamValue {
2512 Text(String),
2513 Integer(i64),
2514 Real(f64),
2515 Bool(bool),
2516 Null,
2517}
2518
2519pub(crate) enum SqlConn {
2521 Sqlite(rusqlite::Connection),
2522 Postgres(postgres::Client),
2523}
2524
2525type SharedConn = Arc<Mutex<SqlConn>>;
2526
2527pub(crate) struct SqlRegistry {
2528 entries: indexmap::IndexMap<u64, SharedConn>,
2529 cap: usize,
2530}
2531
2532impl SqlRegistry {
2533 pub(crate) fn with_capacity(cap: usize) -> Self {
2534 Self { entries: indexmap::IndexMap::new(), cap }
2535 }
2536
2537 pub(crate) fn insert(&mut self, handle: u64, conn: SqlConn) {
2538 if self.entries.len() >= self.cap {
2539 self.entries.shift_remove_index(0);
2540 }
2541 self.entries.insert(handle, Arc::new(Mutex::new(conn)));
2542 }
2543
2544 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
2548 let idx = self.entries.get_index_of(&handle)?;
2549 self.entries.move_index(idx, self.entries.len() - 1);
2550 self.entries.get(&handle).cloned()
2551 }
2552
2553 pub(crate) fn remove(&mut self, handle: u64) {
2554 self.entries.shift_remove(&handle);
2555 }
2556
2557 #[cfg(test)]
2558 pub(crate) fn len(&self) -> usize { self.entries.len() }
2559}
2560
2561fn next_sql_handle() -> u64 {
2562 static COUNTER: AtomicU64 = AtomicU64::new(1);
2563 COUNTER.fetch_add(1, Ordering::SeqCst)
2564}
2565
2566#[cfg(test)]
2567mod sql_registry_tests {
2568 use super::{SqlConn, SqlRegistry};
2569
2570 fn fresh() -> SqlConn {
2571 SqlConn::Sqlite(rusqlite::Connection::open_in_memory().expect("open in-memory sqlite"))
2572 }
2573
2574 #[test]
2575 fn insert_and_get_round_trip() {
2576 let mut r = SqlRegistry::with_capacity(4);
2577 r.insert(1, fresh());
2578 assert!(r.touch_get(1).is_some());
2579 assert!(r.touch_get(2).is_none());
2580 }
2581
2582 #[test]
2583 fn cap_evicts_lru_on_overflow() {
2584 let mut r = SqlRegistry::with_capacity(2);
2585 r.insert(1, fresh());
2586 r.insert(2, fresh());
2587 let _ = r.touch_get(1);
2588 r.insert(3, fresh());
2589 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2590 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2591 assert!(r.touch_get(3).is_some(), "3 just inserted");
2592 assert_eq!(r.len(), 2);
2593 }
2594
2595 #[test]
2596 fn remove_drops_entry() {
2597 let mut r = SqlRegistry::with_capacity(4);
2598 r.insert(1, fresh());
2599 r.remove(1);
2600 assert!(r.touch_get(1).is_none());
2601 assert_eq!(r.len(), 0);
2602 }
2603
2604 #[test]
2605 fn many_inserts_stay_bounded_at_cap() {
2606 let cap = 8;
2607 let mut r = SqlRegistry::with_capacity(cap);
2608 for i in 0..(cap as u64 * 3) {
2609 r.insert(i, fresh());
2610 assert!(r.len() <= cap);
2611 }
2612 assert_eq!(r.len(), cap);
2613 }
2614}
2615
2616#[cfg(test)]
2617mod kv_registry_tests {
2618 use super::KvRegistry;
2619
2620 fn fresh_db(tag: &str) -> sled::Db {
2623 let dir = std::env::temp_dir().join(format!(
2624 "lex-kv-reg-{}-{}-{}",
2625 std::process::id(),
2626 tag,
2627 std::time::SystemTime::now()
2628 .duration_since(std::time::UNIX_EPOCH)
2629 .unwrap()
2630 .as_nanos()
2631 ));
2632 sled::open(&dir).expect("sled open")
2633 }
2634
2635 #[test]
2636 fn insert_and_get_round_trip() {
2637 let mut r = KvRegistry::with_capacity(4);
2638 r.insert(1, fresh_db("a"));
2639 assert!(r.touch_get(1).is_some());
2640 assert!(r.touch_get(2).is_none());
2641 }
2642
2643 #[test]
2644 fn cap_evicts_lru_on_overflow() {
2645 let mut r = KvRegistry::with_capacity(2);
2647 r.insert(1, fresh_db("c1"));
2648 r.insert(2, fresh_db("c2"));
2649 let _ = r.touch_get(1);
2650 r.insert(3, fresh_db("c3"));
2651 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2652 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2653 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2654 assert_eq!(r.len(), 2);
2655 }
2656
2657 #[test]
2658 fn cap_with_no_touches_evicts_in_insertion_order() {
2659 let mut r = KvRegistry::with_capacity(2);
2661 r.insert(10, fresh_db("f1"));
2662 r.insert(20, fresh_db("f2"));
2663 r.insert(30, fresh_db("f3"));
2664 assert!(r.touch_get(10).is_none());
2665 assert!(r.touch_get(20).is_some());
2666 assert!(r.touch_get(30).is_some());
2667 }
2668
2669 #[test]
2670 fn remove_drops_entry() {
2671 let mut r = KvRegistry::with_capacity(4);
2672 r.insert(1, fresh_db("r1"));
2673 r.remove(1);
2674 assert!(r.touch_get(1).is_none());
2675 assert_eq!(r.len(), 0);
2676 }
2677
2678 #[test]
2679 fn remove_unknown_handle_is_noop() {
2680 let mut r = KvRegistry::with_capacity(4);
2681 r.insert(1, fresh_db("u1"));
2682 r.remove(999);
2683 assert!(r.touch_get(1).is_some());
2684 }
2685
2686 #[test]
2687 fn many_inserts_stay_bounded_at_cap() {
2688 let cap = 8;
2691 let mut r = KvRegistry::with_capacity(cap);
2692 for i in 0..(cap as u64 * 3) {
2693 r.insert(i, fresh_db(&format!("b{i}")));
2694 assert!(r.len() <= cap);
2695 }
2696 assert_eq!(r.len(), cap);
2697 }
2698}