1use lex_bytecode::vm::{EffectHandler, Vm};
8use lex_bytecode::{Program, Value};
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Mutex, OnceLock};
12use std::sync::Arc;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::builtins::try_pure_builtin;
16use crate::policy::Policy;
17
18pub trait IoSink: Send {
21 fn print_line(&mut self, s: &str);
22}
23
24pub struct StdoutSink;
25impl IoSink for StdoutSink {
26 fn print_line(&mut self, s: &str) {
27 println!("{s}");
28 }
29}
30
31#[derive(Default)]
32pub struct CapturedSink { pub lines: Vec<String> }
33impl IoSink for CapturedSink {
34 fn print_line(&mut self, s: &str) { self.lines.push(s.to_string()); }
35}
36
37pub type StreamRegistry =
40 std::collections::HashMap<String, Box<dyn Iterator<Item = String> + Send>>;
41
42pub struct DefaultHandler {
43 policy: Policy,
44 pub sink: Box<dyn IoSink>,
45 pub read_root: Option<PathBuf>,
48 pub budget_remaining: Arc<AtomicU64>,
55 pub budget_ceiling: Option<u64>,
59 pub program: Option<Arc<Program>>,
63 pub chat_registry: Option<Arc<crate::ws::ChatRegistry>>,
67 pub mcp_clients: crate::mcp_client::McpClientCache,
73 pub streams: Arc<std::sync::Mutex<StreamRegistry>>,
80 pub next_stream_id: Arc<std::sync::atomic::AtomicU64>,
82}
83
84impl DefaultHandler {
85 pub fn new(policy: Policy) -> Self {
86 let ceiling = policy.budget;
90 let initial = ceiling.unwrap_or(u64::MAX);
91 Self {
92 policy,
93 sink: Box::new(StdoutSink),
94 read_root: None,
95 budget_remaining: Arc::new(AtomicU64::new(initial)),
96 budget_ceiling: ceiling,
97 program: None,
98 chat_registry: None,
99 mcp_clients: crate::mcp_client::McpClientCache::with_capacity(16),
100 streams: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
101 next_stream_id: Arc::new(std::sync::atomic::AtomicU64::new(0)),
102 }
103 }
104
105 pub fn with_program(mut self, program: Arc<Program>) -> Self {
106 self.program = Some(program); self
107 }
108
109 pub fn with_chat_registry(mut self, registry: Arc<crate::ws::ChatRegistry>) -> Self {
110 self.chat_registry = Some(registry); self
111 }
112
113 pub fn with_sink(mut self, sink: Box<dyn IoSink>) -> Self {
114 self.sink = sink; self
115 }
116
117 pub fn with_read_root(mut self, root: PathBuf) -> Self {
118 self.read_root = Some(root); self
119 }
120
121 fn ensure_kind_allowed(&self, kind: &str) -> Result<(), String> {
122 if self.policy.allow_effects.contains(kind) {
123 Ok(())
124 } else {
125 Err(format!("effect `{kind}` not in --allow-effects"))
126 }
127 }
128
129 fn resolve_read_path(&self, p: &str) -> PathBuf {
130 match &self.read_root {
131 Some(root) => root.join(p.trim_start_matches('/')),
132 None => PathBuf::from(p),
133 }
134 }
135
136 fn dispatch_log(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
137 match op {
138 "debug" | "info" | "warn" | "error" => {
139 let msg = expect_str(args.first())?;
140 let level = match op {
141 "debug" => LogLevel::Debug,
142 "info" => LogLevel::Info,
143 "warn" => LogLevel::Warn,
144 _ => LogLevel::Error,
145 };
146 emit_log(level, msg);
147 Ok(Value::Unit)
148 }
149 "set_level" => {
150 let s = expect_str(args.first())?;
151 match parse_log_level(s) {
152 Some(l) => {
153 log_state().lock().unwrap().level = l;
154 Ok(ok(Value::Unit))
155 }
156 None => Ok(err(Value::Str(format!(
157 "log.set_level: unknown level `{s}`; expected debug|info|warn|error")))),
158 }
159 }
160 "set_format" => {
161 let s = expect_str(args.first())?;
162 let fmt = match s {
163 "text" => LogFormat::Text,
164 "json" => LogFormat::Json,
165 other => return Ok(err(Value::Str(format!(
166 "log.set_format: unknown format `{other}`; expected text|json")))),
167 };
168 log_state().lock().unwrap().format = fmt;
169 Ok(ok(Value::Unit))
170 }
171 "set_sink" => {
172 let path = expect_str(args.first())?;
173 if path == "-" {
174 log_state().lock().unwrap().sink = LogSink::Stderr;
175 return Ok(ok(Value::Unit));
176 }
177 if let Err(e) = self.ensure_fs_write_path(path) {
178 return Ok(err(Value::Str(e)));
179 }
180 match std::fs::OpenOptions::new()
181 .create(true).append(true).open(path)
182 {
183 Ok(f) => {
184 log_state().lock().unwrap().sink = LogSink::File(std::sync::Arc::new(Mutex::new(f)));
185 Ok(ok(Value::Unit))
186 }
187 Err(e) => Ok(err(Value::Str(format!("log.set_sink `{path}`: {e}")))),
188 }
189 }
190 other => Err(format!("unsupported log.{other}")),
191 }
192 }
193
194 fn dispatch_process(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
195 match op {
196 "spawn" => {
197 let cmd = expect_str(args.first())?.to_string();
198 let raw_args = match args.get(1) {
199 Some(Value::List(items)) => items.clone(),
200 _ => return Err("process.spawn: args must be List[Str]".into()),
201 };
202 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
203 Value::Str(s) => Ok(s.clone()),
204 other => Err(format!("process.spawn: arg must be Str, got {other:?}")),
205 }).collect();
206 let str_args = str_args?;
207 let opts = match args.get(2) {
208 Some(Value::Record(r)) => r.clone(),
209 _ => return Err("process.spawn: missing or invalid opts record".into()),
210 };
211
212 if !self.policy.allow_proc.is_empty() {
214 let basename = std::path::Path::new(&cmd)
215 .file_name()
216 .and_then(|s| s.to_str())
217 .unwrap_or(&cmd);
218 if !self.policy.allow_proc.iter().any(|a| a == basename) {
219 return Ok(err(Value::Str(format!(
220 "process.spawn: `{cmd}` not in --allow-proc {:?}",
221 self.policy.allow_proc
222 ))));
223 }
224 }
225
226 let mut command = std::process::Command::new(&cmd);
227 command.args(&str_args);
228 command.stdin(std::process::Stdio::piped());
229 command.stdout(std::process::Stdio::piped());
230 command.stderr(std::process::Stdio::piped());
231
232 if let Some(Value::Variant { name, args: vargs }) = opts.get("cwd") {
233 if name == "Some" {
234 if let Some(Value::Str(s)) = vargs.first() {
235 command.current_dir(s);
236 }
237 }
238 }
239 if let Some(Value::Map(env)) = opts.get("env") {
240 for (k, v) in env {
241 if let (lex_bytecode::MapKey::Str(ks), Value::Str(vs)) = (k, v) {
242 command.env(ks, vs);
243 }
244 }
245 }
246
247 let stdin_payload: Option<Vec<u8>> = match opts.get("stdin") {
248 Some(Value::Variant { name, args: vargs }) if name == "Some" => {
249 match vargs.first() {
250 Some(Value::Bytes(b)) => Some(b.clone()),
251 _ => None,
252 }
253 }
254 _ => None,
255 };
256
257 let mut child = match command.spawn() {
258 Ok(c) => c,
259 Err(e) => return Ok(err(Value::Str(format!("process.spawn `{cmd}`: {e}")))),
260 };
261
262 if let Some(payload) = stdin_payload {
263 if let Some(mut stdin) = child.stdin.take() {
264 use std::io::Write;
265 let _ = stdin.write_all(&payload);
266 }
268 }
269
270 let stdout = child.stdout.take().map(std::io::BufReader::new);
271 let stderr = child.stderr.take().map(std::io::BufReader::new);
272 let handle = next_process_handle();
273 process_registry().lock().unwrap().insert(handle, ProcessState {
274 child,
275 stdout,
276 stderr,
277 });
278 Ok(ok(Value::Int(handle as i64)))
279 }
280 "read_stdout_line" => Self::read_line_op(args, true),
281 "read_stderr_line" => Self::read_line_op(args, false),
282 "wait" => {
283 let h = expect_process_handle(args.first())?;
284 let arc = process_registry().lock().unwrap()
288 .touch_get(h)
289 .ok_or_else(|| "process.wait: closed or unknown ProcessHandle".to_string())?;
290 let status = {
291 let mut state = arc.lock().unwrap();
292 state.child.wait().map_err(|e| format!("process.wait: {e}"))?
293 };
294 process_registry().lock().unwrap().remove(h);
298 let mut rec = indexmap::IndexMap::new();
299 rec.insert("code".into(), Value::Int(status.code().unwrap_or(-1) as i64));
300 #[cfg(unix)]
301 {
302 use std::os::unix::process::ExitStatusExt;
303 rec.insert("signaled".into(), Value::Bool(status.signal().is_some()));
304 }
305 #[cfg(not(unix))]
306 {
307 rec.insert("signaled".into(), Value::Bool(false));
308 }
309 Ok(Value::Record(rec))
310 }
311 "kill" => {
312 let h = expect_process_handle(args.first())?;
313 let _signal = expect_str(args.get(1))?;
314 let arc = process_registry().lock().unwrap()
315 .touch_get(h)
316 .ok_or_else(|| "process.kill: closed or unknown ProcessHandle".to_string())?;
317 let mut state = arc.lock().unwrap();
318 match state.child.kill() {
321 Ok(_) => Ok(ok(Value::Unit)),
322 Err(e) => Ok(err(Value::Str(format!("process.kill: {e}")))),
323 }
324 }
325 "run" => {
326 let cmd = expect_str(args.first())?.to_string();
327 let raw_args = match args.get(1) {
328 Some(Value::List(items)) => items.clone(),
329 _ => return Err("process.run: args must be List[Str]".into()),
330 };
331 let str_args: Result<Vec<String>, String> = raw_args.iter().map(|v| match v {
332 Value::Str(s) => Ok(s.clone()),
333 other => Err(format!("process.run: arg must be Str, got {other:?}")),
334 }).collect();
335 let str_args = str_args?;
336 if !self.policy.allow_proc.is_empty() {
337 let basename = std::path::Path::new(&cmd)
338 .file_name()
339 .and_then(|s| s.to_str())
340 .unwrap_or(&cmd);
341 if !self.policy.allow_proc.iter().any(|a| a == basename) {
342 return Ok(err(Value::Str(format!(
343 "process.run: `{cmd}` not in --allow-proc {:?}",
344 self.policy.allow_proc
345 ))));
346 }
347 }
348 match std::process::Command::new(&cmd).args(&str_args).output() {
349 Ok(o) => {
350 let mut rec = indexmap::IndexMap::new();
351 rec.insert("stdout".into(), Value::Str(
352 String::from_utf8_lossy(&o.stdout).to_string()));
353 rec.insert("stderr".into(), Value::Str(
354 String::from_utf8_lossy(&o.stderr).to_string()));
355 rec.insert("exit_code".into(), Value::Int(
356 o.status.code().unwrap_or(-1) as i64));
357 Ok(ok(Value::Record(rec)))
358 }
359 Err(e) => Ok(err(Value::Str(format!("process.run `{cmd}`: {e}")))),
360 }
361 }
362 other => Err(format!("unsupported process.{other}")),
363 }
364 }
365
366 fn read_line_op(args: Vec<Value>, is_stdout: bool) -> Result<Value, String> {
372 let h = expect_process_handle(args.first())?;
373 let arc = process_registry().lock().unwrap()
374 .touch_get(h)
375 .ok_or_else(|| format!(
376 "process.read_{}_line: closed or unknown ProcessHandle",
377 if is_stdout { "stdout" } else { "stderr" }))?;
378 let mut state = arc.lock().unwrap();
379 let reader_opt = if is_stdout {
380 state.stdout.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
381 } else {
382 state.stderr.as_mut().map(|r| -> &mut dyn std::io::BufRead { r })
383 };
384 let reader = match reader_opt {
385 Some(r) => r,
386 None => return Ok(none()),
387 };
388 let mut line = String::new();
389 match reader.read_line(&mut line) {
390 Ok(0) => Ok(none()),
391 Ok(_) => {
392 if line.ends_with('\n') { line.pop(); }
393 if line.ends_with('\r') { line.pop(); }
394 Ok(some(Value::Str(line)))
395 }
396 Err(e) => Err(format!("process.read_*_line: {e}")),
397 }
398 }
399
400 fn dispatch_fs(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
401 match op {
402 "exists" => {
403 let path = expect_str(args.first())?.to_string();
404 if let Err(e) = self.ensure_fs_walk_path(&path) {
405 return Ok(err(Value::Str(e)));
406 }
407 Ok(Value::Bool(std::path::Path::new(&path).exists()))
408 }
409 "is_file" => {
410 let path = expect_str(args.first())?.to_string();
411 if let Err(e) = self.ensure_fs_walk_path(&path) {
412 return Ok(err(Value::Str(e)));
413 }
414 Ok(Value::Bool(std::path::Path::new(&path).is_file()))
415 }
416 "is_dir" => {
417 let path = expect_str(args.first())?.to_string();
418 if let Err(e) = self.ensure_fs_walk_path(&path) {
419 return Ok(err(Value::Str(e)));
420 }
421 Ok(Value::Bool(std::path::Path::new(&path).is_dir()))
422 }
423 "stat" => {
424 let path = expect_str(args.first())?.to_string();
425 if let Err(e) = self.ensure_fs_walk_path(&path) {
426 return Ok(err(Value::Str(e)));
427 }
428 match std::fs::metadata(&path) {
429 Ok(md) => {
430 let mtime = md.modified()
431 .ok()
432 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
433 .map(|d| d.as_secs() as i64)
434 .unwrap_or(0);
435 let mut rec = indexmap::IndexMap::new();
436 rec.insert("size".into(), Value::Int(md.len() as i64));
437 rec.insert("mtime".into(), Value::Int(mtime));
438 rec.insert("is_dir".into(), Value::Bool(md.is_dir()));
439 rec.insert("is_file".into(), Value::Bool(md.is_file()));
440 Ok(ok(Value::Record(rec)))
441 }
442 Err(e) => Ok(err(Value::Str(format!("fs.stat `{path}`: {e}")))),
443 }
444 }
445 "list_dir" => {
446 let path = expect_str(args.first())?.to_string();
447 if let Err(e) = self.ensure_fs_walk_path(&path) {
448 return Ok(err(Value::Str(e)));
449 }
450 match std::fs::read_dir(&path) {
451 Ok(rd) => {
452 let mut entries: Vec<Value> = Vec::new();
453 for ent in rd {
454 match ent {
455 Ok(e) => {
456 let p = e.path();
457 entries.push(Value::Str(p.to_string_lossy().into_owned()));
458 }
459 Err(e) => return Ok(err(Value::Str(format!("fs.list_dir: {e}")))),
460 }
461 }
462 Ok(ok(Value::List(entries)))
463 }
464 Err(e) => Ok(err(Value::Str(format!("fs.list_dir `{path}`: {e}")))),
465 }
466 }
467 "walk" => {
468 let path = expect_str(args.first())?.to_string();
469 if let Err(e) = self.ensure_fs_walk_path(&path) {
470 return Ok(err(Value::Str(e)));
471 }
472 let mut paths: Vec<Value> = Vec::new();
473 for ent in walkdir::WalkDir::new(&path) {
474 match ent {
475 Ok(e) => paths.push(Value::Str(
476 e.path().to_string_lossy().into_owned())),
477 Err(e) => return Ok(err(Value::Str(format!("fs.walk: {e}")))),
478 }
479 }
480 Ok(ok(Value::List(paths)))
481 }
482 "glob" => {
483 let pattern = expect_str(args.first())?.to_string();
484 let entries = match glob::glob(&pattern) {
489 Ok(e) => e,
490 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
491 };
492 let mut paths: Vec<Value> = Vec::new();
493 for ent in entries {
494 match ent {
495 Ok(p) => {
496 let s = p.to_string_lossy().into_owned();
497 if self.policy.allow_fs_read.is_empty()
498 || self.policy.allow_fs_read.iter().any(|root| p.starts_with(root))
499 {
500 paths.push(Value::Str(s));
501 }
502 }
503 Err(e) => return Ok(err(Value::Str(format!("fs.glob: {e}")))),
504 }
505 }
506 Ok(ok(Value::List(paths)))
507 }
508 "mkdir_p" => {
509 let path = expect_str(args.first())?.to_string();
510 if let Err(e) = self.ensure_fs_write_path(&path) {
511 return Ok(err(Value::Str(e)));
512 }
513 match std::fs::create_dir_all(&path) {
514 Ok(_) => Ok(ok(Value::Unit)),
515 Err(e) => Ok(err(Value::Str(format!("fs.mkdir_p `{path}`: {e}")))),
516 }
517 }
518 "remove" => {
519 let path = expect_str(args.first())?.to_string();
520 if let Err(e) = self.ensure_fs_write_path(&path) {
521 return Ok(err(Value::Str(e)));
522 }
523 let p = std::path::Path::new(&path);
524 let result = if p.is_dir() {
525 std::fs::remove_dir_all(p)
526 } else {
527 std::fs::remove_file(p)
528 };
529 match result {
530 Ok(_) => Ok(ok(Value::Unit)),
531 Err(e) => Ok(err(Value::Str(format!("fs.remove `{path}`: {e}")))),
532 }
533 }
534 "copy" => {
535 let src = expect_str(args.first())?.to_string();
536 let dst = expect_str(args.get(1))?.to_string();
537 if let Err(e) = self.ensure_fs_walk_path(&src) {
538 return Ok(err(Value::Str(e)));
539 }
540 if let Err(e) = self.ensure_fs_write_path(&dst) {
541 return Ok(err(Value::Str(e)));
542 }
543 match std::fs::copy(&src, &dst) {
544 Ok(_) => Ok(ok(Value::Unit)),
545 Err(e) => Ok(err(Value::Str(format!("fs.copy {src} -> {dst}: {e}")))),
546 }
547 }
548 other => Err(format!("unsupported fs.{other}")),
549 }
550 }
551
552 fn ensure_fs_walk_path(&self, path: &str) -> Result<(), String> {
557 if self.policy.allow_fs_read.is_empty() {
558 return Ok(());
559 }
560 let p = std::path::Path::new(path);
561 if self.policy.allow_fs_read.iter().any(|a| p.starts_with(a)) {
562 Ok(())
563 } else {
564 Err(format!("fs path `{path}` outside --allow-fs-read"))
565 }
566 }
567
568 fn ensure_fs_write_path(&self, path: &str) -> Result<(), String> {
571 if self.policy.allow_fs_write.is_empty() {
572 return Ok(());
573 }
574 let p = std::path::Path::new(path);
575 if self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
576 Ok(())
577 } else {
578 Err(format!("fs path `{path}` outside --allow-fs-write"))
579 }
580 }
581
582 fn ensure_host_allowed(&self, url: &str) -> Result<(), String> {
586 if self.policy.allow_net_host.is_empty() { return Ok(()); }
587 let host = extract_host(url).unwrap_or("");
588 if self.policy.allow_net_host.iter().any(|h| host == h) {
589 Ok(())
590 } else {
591 Err(format!(
592 "net call to host `{host}` not in --allow-net-host {:?}",
593 self.policy.allow_net_host,
594 ))
595 }
596 }
597}
598
599fn extract_host(url: &str) -> Option<&str> {
600 let rest = url.strip_prefix("http://").or_else(|| url.strip_prefix("https://"))?;
601 let host_port = match rest.find('/') {
602 Some(i) => &rest[..i],
603 None => rest,
604 };
605 Some(match host_port.rsplit_once(':') {
606 Some((h, _)) => h,
607 None => host_port,
608 })
609}
610
611impl EffectHandler for DefaultHandler {
612 fn note_call_budget(&mut self, cost: u64) -> Result<(), String> {
617 let Some(ceiling) = self.budget_ceiling else { return Ok(()); };
620 loop {
626 let cur = self.budget_remaining.load(Ordering::SeqCst);
627 if cost > cur {
628 let used = ceiling.saturating_sub(cur);
629 return Err(format!(
630 "budget exceeded: requested {cost}, used so far {used}, ceiling {ceiling}"));
631 }
632 let next = cur - cost;
633 if self.budget_remaining.compare_exchange(cur, next,
636 Ordering::SeqCst, Ordering::SeqCst).is_ok() {
637 return Ok(());
638 }
639 }
640 }
641
642 fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String> {
643 if let Some(r) = try_pure_builtin(kind, op, &args) {
647 return r;
648 }
649 if kind == "process" {
653 self.ensure_kind_allowed("proc")?;
654 return self.dispatch_process(op, args);
655 }
656 if kind == "log" {
657 let effect_kind = match op {
660 "debug" | "info" | "warn" | "error" => "log",
661 "set_level" | "set_format" => "io",
662 "set_sink" => {
663 self.ensure_kind_allowed("io")?;
664 self.ensure_kind_allowed("fs_write")?;
665 return self.dispatch_log(op, args);
666 }
667 other => return Err(format!("unsupported log.{other}")),
668 };
669 self.ensure_kind_allowed(effect_kind)?;
670 return self.dispatch_log(op, args);
671 }
672 if kind == "fs" {
673 let effect_kind = match op {
674 "exists" | "is_file" | "is_dir" | "stat"
675 | "list_dir" | "walk" | "glob" => "fs_walk",
676 "mkdir_p" | "remove" => "fs_write",
677 "copy" => {
678 self.ensure_kind_allowed("fs_walk")?;
679 self.ensure_kind_allowed("fs_write")?;
680 return self.dispatch_fs(op, args);
681 }
682 other => return Err(format!("unsupported fs.{other}")),
683 };
684 self.ensure_kind_allowed(effect_kind)?;
685 return self.dispatch_fs(op, args);
686 }
687 if kind == "datetime" && op == "now" {
694 self.ensure_kind_allowed("time")?;
695 let now = chrono::Utc::now();
696 let nanos = now.timestamp_nanos_opt().unwrap_or(i64::MAX);
697 return Ok(Value::Int(nanos));
698 }
699 if kind == "crypto" && op == "random" {
700 self.ensure_kind_allowed("random")?;
701 let n = expect_int(args.first())?;
702 if !(0..=1_048_576).contains(&n) {
703 return Err("crypto.random: n must be in 0..=1048576".into());
704 }
705 use rand::{rngs::SysRng, TryRng};
706 let mut buf = vec![0u8; n as usize];
707 SysRng.try_fill_bytes(&mut buf)
708 .map_err(|e| format!("crypto.random: OS RNG: {e}"))?;
709 return Ok(Value::Bytes(buf));
710 }
711 if kind == "agent" {
724 let effect_kind = match op {
725 "local_complete" => "llm_local",
726 "cloud_complete" => "llm_cloud",
727 "cloud_stream" => "llm_cloud",
728 "send_a2a" => "a2a",
729 "call_mcp" => "mcp",
730 other => return Err(format!("unsupported agent.{other}")),
731 };
732 self.ensure_kind_allowed(effect_kind)?;
733 return match op {
741 "call_mcp" => Ok(self.dispatch_call_mcp(args)),
742 "local_complete" => Ok(dispatch_llm_local(args)),
743 "cloud_complete" => Ok(dispatch_llm_cloud(args)),
744 "cloud_stream" => Ok(self.dispatch_cloud_stream(args)),
745 _ => Ok(ok(Value::Str(format!("<{effect_kind} stub>")))),
746 };
747 }
748 if kind == "stream" {
749 self.ensure_kind_allowed("stream")?;
756 return match op {
757 "next" => Ok(self.dispatch_stream_next(args)),
758 "collect" => Ok(self.dispatch_stream_collect(args)),
759 other => Err(format!("unsupported stream.{other}")),
760 };
761 }
762 if kind == "http" && matches!(op, "send" | "get" | "post") {
763 self.ensure_kind_allowed("net")?;
764 return match op {
765 "send" => {
766 let req = expect_record(args.first())?;
767 Ok(http_send_record(self, req))
768 }
769 "get" => {
770 let url = expect_str(args.first())?.to_string();
771 self.ensure_host_allowed(&url)?;
772 Ok(http_send_simple("GET", &url, None, "", None))
773 }
774 "post" => {
775 let url = expect_str(args.first())?.to_string();
776 let body = expect_bytes(args.get(1))?.clone();
777 let content_type = expect_str(args.get(2))?.to_string();
778 self.ensure_host_allowed(&url)?;
779 Ok(http_send_simple("POST", &url, Some(body), &content_type, None))
780 }
781 _ => unreachable!(),
782 };
783 }
784 self.ensure_kind_allowed(kind)?;
785 match (kind, op) {
786 ("io", "print") => {
787 let line = expect_str(args.first())?;
788 self.sink.print_line(line);
789 Ok(Value::Unit)
790 }
791 ("io", "read") => {
792 let path = expect_str(args.first())?.to_string();
793 let resolved = self.resolve_read_path(&path);
794 if !self.policy.allow_fs_read.is_empty()
801 && !self.policy.allow_fs_read.iter().any(|a| resolved.starts_with(a))
802 {
803 return Err(format!("read of `{path}` outside --allow-fs-read"));
804 }
805 match std::fs::read_to_string(&resolved) {
806 Ok(s) => Ok(ok(Value::Str(s))),
807 Err(e) => Ok(err(Value::Str(format!("{e}")))),
808 }
809 }
810 ("io", "write") => {
811 let path = expect_str(args.first())?.to_string();
812 let contents = expect_str(args.get(1))?.to_string();
813 if !self.policy.allow_fs_write.is_empty() {
815 let p = std::path::Path::new(&path);
816 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
817 return Err(format!("write to `{path}` outside --allow-fs-write"));
818 }
819 }
820 match std::fs::write(&path, contents) {
821 Ok(_) => Ok(ok(Value::Unit)),
822 Err(e) => Ok(err(Value::Str(format!("{e}")))),
823 }
824 }
825 ("time", "now") => {
826 let secs = SystemTime::now().duration_since(UNIX_EPOCH)
827 .map_err(|e| format!("time: {e}"))?.as_secs();
828 Ok(Value::Int(secs as i64))
829 }
830 ("time", "sleep_ms") => {
831 let n = expect_int(args.first())?;
839 if n > 0 {
840 let ms = (n as u64).min(60_000);
841 std::thread::sleep(std::time::Duration::from_millis(ms));
842 }
843 Ok(Value::Unit)
844 }
845 ("rand", "int_in") => {
846 let lo = expect_int(args.first())?;
848 let hi = expect_int(args.get(1))?;
849 Ok(Value::Int((lo + hi) / 2))
850 }
851 ("env", "get") => {
856 let name = expect_str(args.first())?;
857 Ok(match std::env::var(name) {
858 Ok(v) => Value::Variant {
859 name: "Some".into(),
860 args: vec![Value::Str(v)],
861 },
862 Err(_) => Value::Variant { name: "None".into(), args: Vec::new() },
863 })
864 }
865 ("budget", _) => {
866 Ok(Value::Unit)
869 }
870 ("net", "get") => {
871 let url = expect_str(args.first())?.to_string();
872 self.ensure_host_allowed(&url)?;
873 Ok(http_request("GET", &url, None))
874 }
875 ("net", "post") => {
876 let url = expect_str(args.first())?.to_string();
877 let body = expect_str(args.get(1))?.to_string();
878 self.ensure_host_allowed(&url)?;
879 Ok(http_request("POST", &url, Some(&body)))
880 }
881 ("net", "serve") => {
882 let port = match args.first() {
883 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
884 _ => return Err("net.serve(port, handler): port must be Int 0..=65535".into()),
885 };
886 let handler_name = expect_str(args.get(1))?.to_string();
887 let program = self.program.clone()
888 .ok_or_else(|| "net.serve requires a Program reference; use DefaultHandler::with_program".to_string())?;
889 let policy = self.policy.clone();
890 serve_http(port, handler_name, program, policy, None)
891 }
892 ("net", "serve_tls") => {
893 let port = match args.first() {
894 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
895 _ => return Err("net.serve_tls(port, cert, key, handler): port must be Int 0..=65535".into()),
896 };
897 let cert_path = expect_str(args.get(1))?.to_string();
898 let key_path = expect_str(args.get(2))?.to_string();
899 let handler_name = expect_str(args.get(3))?.to_string();
900 let program = self.program.clone()
901 .ok_or_else(|| "net.serve_tls requires a Program reference".to_string())?;
902 let policy = self.policy.clone();
903 let cert = std::fs::read(&cert_path)
904 .map_err(|e| format!("net.serve_tls: read cert {cert_path}: {e}"))?;
905 let key = std::fs::read(&key_path)
906 .map_err(|e| format!("net.serve_tls: read key {key_path}: {e}"))?;
907 serve_http(port, handler_name, program, policy, Some(TlsConfig { cert, key }))
908 }
909 ("net", "serve_ws") => {
910 let port = match args.first() {
911 Some(Value::Int(n)) if (0..=65535).contains(n) => *n as u16,
912 _ => return Err("net.serve_ws(port, on_message): port must be Int 0..=65535".into()),
913 };
914 let handler_name = expect_str(args.get(1))?.to_string();
915 let program = self.program.clone()
916 .ok_or_else(|| "net.serve_ws requires a Program reference".to_string())?;
917 let policy = self.policy.clone();
918 let registry = Arc::new(crate::ws::ChatRegistry::default());
919 crate::ws::serve_ws(port, handler_name, program, policy, registry)
920 }
921 ("chat", "broadcast") => {
922 let registry = self.chat_registry.as_ref()
923 .ok_or_else(|| "chat.broadcast called outside a net.serve_ws handler".to_string())?;
924 let room = expect_str(args.first())?;
925 let body = expect_str(args.get(1))?;
926 crate::ws::chat_broadcast(registry, room, body);
927 Ok(Value::Unit)
928 }
929 ("chat", "send") => {
930 let registry = self.chat_registry.as_ref()
931 .ok_or_else(|| "chat.send called outside a net.serve_ws handler".to_string())?;
932 let conn_id = match args.first() {
933 Some(Value::Int(n)) if *n >= 0 => *n as u64,
934 _ => return Err("chat.send: conn_id must be non-negative Int".into()),
935 };
936 let body = expect_str(args.get(1))?;
937 Ok(Value::Bool(crate::ws::chat_send(registry, conn_id, body)))
938 }
939 ("kv", "open") => {
940 let path = expect_str(args.first())?.to_string();
941 if !self.policy.allow_fs_write.is_empty() {
945 let p = std::path::Path::new(&path);
946 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
947 return Ok(err(Value::Str(format!(
948 "kv.open: `{path}` outside --allow-fs-write"))));
949 }
950 }
951 match sled::open(&path) {
952 Ok(db) => {
953 let handle = next_kv_handle();
954 kv_registry().lock().unwrap().insert(handle, db);
955 Ok(ok(Value::Int(handle as i64)))
956 }
957 Err(e) => Ok(err(Value::Str(format!("kv.open: {e}")))),
958 }
959 }
960 ("kv", "close") => {
961 let h = expect_kv_handle(args.first())?;
962 kv_registry().lock().unwrap().remove(h);
963 Ok(Value::Unit)
964 }
965 ("kv", "get") => {
966 let h = expect_kv_handle(args.first())?;
967 let key = expect_str(args.get(1))?;
968 let mut reg = kv_registry().lock().unwrap();
969 let db = reg.touch_get(h).ok_or_else(|| "kv.get: closed or unknown Kv handle".to_string())?;
970 match db.get(key.as_bytes()) {
971 Ok(Some(ivec)) => Ok(some(Value::Bytes(ivec.to_vec()))),
972 Ok(None) => Ok(none()),
973 Err(e) => Err(format!("kv.get: {e}")),
974 }
975 }
976 ("kv", "put") => {
977 let h = expect_kv_handle(args.first())?;
978 let key = expect_str(args.get(1))?.to_string();
979 let val = expect_bytes(args.get(2))?.clone();
980 let mut reg = kv_registry().lock().unwrap();
981 let db = reg.touch_get(h).ok_or_else(|| "kv.put: closed or unknown Kv handle".to_string())?;
982 match db.insert(key.as_bytes(), val) {
983 Ok(_) => Ok(ok(Value::Unit)),
984 Err(e) => Ok(err(Value::Str(format!("kv.put: {e}")))),
985 }
986 }
987 ("kv", "delete") => {
988 let h = expect_kv_handle(args.first())?;
989 let key = expect_str(args.get(1))?;
990 let mut reg = kv_registry().lock().unwrap();
991 let db = reg.touch_get(h).ok_or_else(|| "kv.delete: closed or unknown Kv handle".to_string())?;
992 match db.remove(key.as_bytes()) {
993 Ok(_) => Ok(ok(Value::Unit)),
994 Err(e) => Ok(err(Value::Str(format!("kv.delete: {e}")))),
995 }
996 }
997 ("kv", "contains") => {
998 let h = expect_kv_handle(args.first())?;
999 let key = expect_str(args.get(1))?;
1000 let mut reg = kv_registry().lock().unwrap();
1001 let db = reg.touch_get(h).ok_or_else(|| "kv.contains: closed or unknown Kv handle".to_string())?;
1002 match db.contains_key(key.as_bytes()) {
1003 Ok(present) => Ok(Value::Bool(present)),
1004 Err(e) => Err(format!("kv.contains: {e}")),
1005 }
1006 }
1007 ("kv", "list_prefix") => {
1008 let h = expect_kv_handle(args.first())?;
1009 let prefix = expect_str(args.get(1))?;
1010 let mut reg = kv_registry().lock().unwrap();
1011 let db = reg.touch_get(h).ok_or_else(|| "kv.list_prefix: closed or unknown Kv handle".to_string())?;
1012 let mut keys: Vec<Value> = Vec::new();
1013 for kv in db.scan_prefix(prefix.as_bytes()) {
1014 let (k, _) = kv.map_err(|e| format!("kv.list_prefix: {e}"))?;
1015 let s = String::from_utf8_lossy(&k).to_string();
1016 keys.push(Value::Str(s));
1017 }
1018 Ok(Value::List(keys))
1019 }
1020 ("sql", "open") => {
1021 let path = expect_str(args.first())?.to_string();
1022 if path != ":memory:" && !self.policy.allow_fs_write.is_empty() {
1026 let p = std::path::Path::new(&path);
1027 if !self.policy.allow_fs_write.iter().any(|a| p.starts_with(a)) {
1028 return Ok(err(Value::Str(format!(
1029 "sql.open: `{path}` outside --allow-fs-write"))));
1030 }
1031 }
1032 match rusqlite::Connection::open(&path) {
1033 Ok(conn) => {
1034 let handle = next_sql_handle();
1035 sql_registry().lock().unwrap().insert(handle, conn);
1036 Ok(ok(Value::Int(handle as i64)))
1037 }
1038 Err(e) => Ok(err(Value::Str(format!("sql.open: {e}")))),
1039 }
1040 }
1041 ("sql", "close") => {
1042 let h = expect_sql_handle(args.first())?;
1043 sql_registry().lock().unwrap().remove(h);
1044 Ok(Value::Unit)
1045 }
1046 ("sql", "exec") => {
1047 let h = expect_sql_handle(args.first())?;
1048 let stmt = expect_str(args.get(1))?.to_string();
1049 let params = expect_str_list(args.get(2))?;
1050 let arc = sql_registry().lock().unwrap()
1051 .touch_get(h)
1052 .ok_or_else(|| "sql.exec: closed or unknown Db handle".to_string())?;
1053 let conn = arc.lock().unwrap();
1054 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1055 .map(|s| s as &dyn rusqlite::ToSql)
1056 .collect();
1057 match conn.execute(&stmt, rusqlite::params_from_iter(bind.iter())) {
1058 Ok(n) => Ok(ok(Value::Int(n as i64))),
1059 Err(e) => Ok(err(Value::Str(format!("sql.exec: {e}")))),
1060 }
1061 }
1062 ("sql", "query") => {
1063 let h = expect_sql_handle(args.first())?;
1064 let stmt_str = expect_str(args.get(1))?.to_string();
1065 let params = expect_str_list(args.get(2))?;
1066 let arc = sql_registry().lock().unwrap()
1067 .touch_get(h)
1068 .ok_or_else(|| "sql.query: closed or unknown Db handle".to_string())?;
1069 let conn = arc.lock().unwrap();
1070 Ok(sql_run_query(&conn, &stmt_str, ¶ms))
1071 }
1072 ("proc", "spawn") => {
1073 let cmd = expect_str(args.first())?.to_string();
1087 let raw_args = match args.get(1) {
1088 Some(Value::List(items)) => items,
1089 Some(other) => return Err(format!(
1090 "proc.spawn: args must be List[Str], got {other:?}")),
1091 None => return Err("proc.spawn: missing args list".into()),
1092 };
1093 let str_args: Vec<String> = raw_args.iter().map(|v| match v {
1094 Value::Str(s) => Ok(s.clone()),
1095 other => Err(format!("proc.spawn: arg must be Str, got {other:?}")),
1096 }).collect::<Result<Vec<_>, _>>()?;
1097
1098 if !self.policy.allow_proc.is_empty() {
1102 let basename = std::path::Path::new(&cmd)
1103 .file_name()
1104 .and_then(|s| s.to_str())
1105 .unwrap_or(&cmd);
1106 if !self.policy.allow_proc.iter().any(|a| a == basename) {
1107 return Ok(err(Value::Str(format!(
1108 "proc.spawn: `{cmd}` not in --allow-proc {:?}",
1109 self.policy.allow_proc
1110 ))));
1111 }
1112 }
1113
1114 if str_args.len() > 1024 {
1117 return Ok(err(Value::Str(
1118 "proc.spawn: arg-count exceeds 1024".into())));
1119 }
1120 if str_args.iter().any(|a| a.len() > 65_536) {
1121 return Ok(err(Value::Str(
1122 "proc.spawn: per-arg length exceeds 64 KiB".into())));
1123 }
1124
1125 let output = std::process::Command::new(&cmd)
1126 .args(&str_args)
1127 .output();
1128 match output {
1129 Ok(o) => {
1130 let mut rec = indexmap::IndexMap::new();
1131 rec.insert("stdout".into(), Value::Str(
1132 String::from_utf8_lossy(&o.stdout).to_string()));
1133 rec.insert("stderr".into(), Value::Str(
1134 String::from_utf8_lossy(&o.stderr).to_string()));
1135 rec.insert("exit_code".into(), Value::Int(
1136 o.status.code().unwrap_or(-1) as i64));
1137 Ok(ok(Value::Record(rec)))
1138 }
1139 Err(e) => Ok(err(Value::Str(format!("spawn `{cmd}`: {e}")))),
1140 }
1141 }
1142 other => Err(format!("unsupported effect {}.{}", other.0, other.1)),
1143 }
1144 }
1145
1146 fn spawn_for_worker(&self) -> Option<Box<dyn lex_bytecode::vm::EffectHandler + Send>> {
1169 let mut fresh = DefaultHandler::new(self.policy.clone());
1170 fresh.budget_remaining = std::sync::Arc::clone(&self.budget_remaining);
1173 fresh.budget_ceiling = self.budget_ceiling;
1174 fresh.read_root = self.read_root.clone();
1175 fresh.program = self.program.clone();
1176 fresh.chat_registry = self.chat_registry.clone();
1177 fresh.streams = std::sync::Arc::clone(&self.streams);
1182 fresh.next_stream_id = std::sync::Arc::clone(&self.next_stream_id);
1183 Some(Box::new(fresh))
1184 }
1185}
1186
1187pub struct TlsConfig {
1197 pub cert: Vec<u8>,
1198 pub key: Vec<u8>,
1199}
1200
1201fn serve_http(
1202 port: u16,
1203 handler_name: String,
1204 program: Arc<Program>,
1205 policy: Policy,
1206 tls: Option<TlsConfig>,
1207) -> Result<Value, String> {
1208 let (server, scheme) = match tls {
1209 None => (
1210 tiny_http::Server::http(("127.0.0.1", port))
1211 .map_err(|e| format!("net.serve bind {port}: {e}"))?,
1212 "http",
1213 ),
1214 Some(cfg) => {
1215 let ssl = tiny_http::SslConfig {
1216 certificate: cfg.cert,
1217 private_key: cfg.key,
1218 };
1219 (
1220 tiny_http::Server::https(("127.0.0.1", port), ssl)
1221 .map_err(|e| format!("net.serve_tls bind {port}: {e}"))?,
1222 "https",
1223 )
1224 }
1225 };
1226 eprintln!("net.serve: listening on {scheme}://127.0.0.1:{port}");
1227 for req in server.incoming_requests() {
1233 let program = Arc::clone(&program);
1234 let policy = policy.clone();
1235 let handler_name = handler_name.clone();
1236 std::thread::spawn(move || handle_request(req, program, policy, handler_name));
1237 }
1238 Ok(Value::Unit)
1239}
1240
1241fn handle_request(
1242 mut req: tiny_http::Request,
1243 program: Arc<Program>,
1244 policy: Policy,
1245 handler_name: String,
1246) {
1247 let lex_req = build_request_value(&mut req);
1248 let handler = DefaultHandler::new(policy).with_program(Arc::clone(&program));
1249 let mut vm = Vm::with_handler(&program, Box::new(handler));
1250 match vm.call(&handler_name, vec![lex_req]) {
1251 Ok(resp) => {
1252 let (status, body) = unpack_response(&resp);
1253 let response = tiny_http::Response::from_string(body).with_status_code(status);
1254 let _ = req.respond(response);
1255 }
1256 Err(e) => {
1257 let response = tiny_http::Response::from_string(format!("internal error: {e}"))
1258 .with_status_code(500);
1259 let _ = req.respond(response);
1260 }
1261 }
1262}
1263
1264fn build_request_value(req: &mut tiny_http::Request) -> Value {
1265 let method = format!("{:?}", req.method()).to_uppercase();
1266 let url = req.url().to_string();
1267 let (path, query) = match url.split_once('?') {
1268 Some((p, q)) => (p.to_string(), q.to_string()),
1269 None => (url, String::new()),
1270 };
1271 let mut body = String::new();
1272 let _ = req.as_reader().read_to_string(&mut body);
1273 let mut rec = indexmap::IndexMap::new();
1274 rec.insert("method".into(), Value::Str(method));
1275 rec.insert("path".into(), Value::Str(path));
1276 rec.insert("query".into(), Value::Str(query));
1277 rec.insert("body".into(), Value::Str(body));
1278 Value::Record(rec)
1279}
1280
1281fn unpack_response(v: &Value) -> (u16, String) {
1282 if let Value::Record(rec) = v {
1283 let status = rec.get("status").and_then(|s| match s {
1284 Value::Int(n) => Some(*n as u16),
1285 _ => None,
1286 }).unwrap_or(200);
1287 let body = rec.get("body").and_then(|b| match b {
1288 Value::Str(s) => Some(s.clone()),
1289 _ => None,
1290 }).unwrap_or_default();
1291 return (status, body);
1292 }
1293 (500, format!("handler returned non-record: {v:?}"))
1294}
1295
1296fn http_request(method: &str, url: &str, body: Option<&str>) -> Value {
1302 use std::time::Duration;
1303 let agent: ureq::Agent = ureq::Agent::config_builder()
1308 .timeout_connect(Some(Duration::from_secs(10)))
1309 .timeout_recv_body(Some(Duration::from_secs(30)))
1310 .timeout_send_body(Some(Duration::from_secs(10)))
1311 .http_status_as_error(false)
1312 .build()
1313 .into();
1314 let resp = match (method, body) {
1315 ("GET", _) => agent.get(url).call(),
1316 ("POST", Some(b)) => agent.post(url).send(b),
1317 ("POST", None) => agent.post(url).send(""),
1318 (m, _) => return err_value(format!("unsupported method: {m}")),
1319 };
1320 match resp {
1321 Ok(mut r) => {
1322 let status = r.status().as_u16();
1323 let body = r.body_mut().read_to_string().unwrap_or_default();
1324 if (200..300).contains(&status) {
1325 Value::Variant { name: "Ok".into(), args: vec![Value::Str(body)] }
1326 } else {
1327 err_value(format!("status {status}: {body}"))
1328 }
1329 }
1330 Err(e) => err_value(format!("transport: {e}")),
1331 }
1332}
1333
1334fn http_agent(timeout_ms: Option<u64>) -> ureq::Agent {
1339 use std::time::Duration;
1340 let mut b = ureq::Agent::config_builder()
1341 .timeout_connect(Some(Duration::from_secs(10)))
1342 .timeout_recv_body(Some(Duration::from_secs(30)))
1343 .timeout_send_body(Some(Duration::from_secs(10)))
1344 .http_status_as_error(false);
1345 if let Some(ms) = timeout_ms {
1346 let d = Duration::from_millis(ms);
1347 b = b.timeout_global(Some(d));
1348 }
1349 b.build().into()
1350}
1351
1352fn http_error_value(e: ureq::Error) -> Value {
1356 let (ctor, payload): (&str, Option<String>) = match &e {
1357 ureq::Error::Timeout(_) => ("TimeoutError", None),
1358 ureq::Error::Tls(s) => ("TlsError", Some((*s).into())),
1359 ureq::Error::Pem(p) => ("TlsError", Some(format!("{p}"))),
1360 ureq::Error::Rustls(r) => ("TlsError", Some(format!("{r}"))),
1361 _ => ("NetworkError", Some(format!("{e}"))),
1362 };
1363 let args = match payload { Some(s) => vec![Value::Str(s)], None => vec![] };
1364 let inner = Value::Variant { name: ctor.into(), args };
1365 Value::Variant { name: "Err".into(), args: vec![inner] }
1366}
1367
1368fn http_decode_err(msg: String) -> Value {
1369 let inner = Value::Variant {
1370 name: "DecodeError".into(),
1371 args: vec![Value::Str(msg)],
1372 };
1373 Value::Variant { name: "Err".into(), args: vec![inner] }
1374}
1375
1376fn http_send_simple(
1381 method: &str,
1382 url: &str,
1383 body: Option<Vec<u8>>,
1384 content_type: &str,
1385 timeout_ms: Option<u64>,
1386) -> Value {
1387 http_send_full(method, url, body, content_type, &[], timeout_ms)
1388}
1389
1390fn http_send_full(
1391 method: &str,
1392 url: &str,
1393 body: Option<Vec<u8>>,
1394 content_type: &str,
1395 headers: &[(String, String)],
1396 timeout_ms: Option<u64>,
1397) -> Value {
1398 let agent = http_agent(timeout_ms);
1399 let resp = match method {
1400 "GET" => {
1401 let mut req = agent.get(url);
1402 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1403 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1404 req.call()
1405 }
1406 "POST" => {
1407 let body = body.unwrap_or_default();
1408 let mut req = agent.post(url);
1409 if !content_type.is_empty() { req = req.header("content-type", content_type); }
1410 for (k, v) in headers { req = req.header(k.as_str(), v.as_str()); }
1411 req.send(&body[..])
1412 }
1413 m => {
1414 return http_decode_err(format!("unsupported method: {m}"));
1418 }
1419 };
1420 match resp {
1421 Ok(mut r) => {
1422 let status = r.status().as_u16() as i64;
1423 let headers_map = collect_response_headers(r.headers());
1424 let body_bytes = match r.body_mut().with_config().limit(10 * 1024 * 1024).read_to_vec() {
1425 Ok(b) => b,
1426 Err(e) => return http_decode_err(format!("body read: {e}")),
1427 };
1428 let mut rec = indexmap::IndexMap::new();
1429 rec.insert("status".into(), Value::Int(status));
1430 rec.insert("headers".into(), Value::Map(headers_map));
1431 rec.insert("body".into(), Value::Bytes(body_bytes));
1432 Value::Variant { name: "Ok".into(), args: vec![Value::Record(rec)] }
1433 }
1434 Err(e) => http_error_value(e),
1435 }
1436}
1437
1438fn collect_response_headers(
1439 headers: &ureq::http::HeaderMap,
1440) -> std::collections::BTreeMap<lex_bytecode::MapKey, Value> {
1441 let mut out = std::collections::BTreeMap::new();
1442 for (name, value) in headers.iter() {
1443 let v = value.to_str().unwrap_or("").to_string();
1444 out.insert(lex_bytecode::MapKey::Str(name.as_str().to_string()), Value::Str(v));
1445 }
1446 out
1447}
1448
1449fn http_send_record(handler: &DefaultHandler, req: &indexmap::IndexMap<String, Value>) -> Value {
1453 let method = match req.get("method") {
1454 Some(Value::Str(s)) => s.clone(),
1455 _ => return http_decode_err("HttpRequest.method must be Str".into()),
1456 };
1457 let url = match req.get("url") {
1458 Some(Value::Str(s)) => s.clone(),
1459 _ => return http_decode_err("HttpRequest.url must be Str".into()),
1460 };
1461 if let Err(e) = handler.ensure_host_allowed(&url) {
1462 return http_decode_err(e);
1463 }
1464 let body = match req.get("body") {
1465 Some(Value::Variant { name, args }) if name == "None" => None,
1466 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1467 [Value::Bytes(b)] => Some(b.clone()),
1468 _ => return http_decode_err("HttpRequest.body Some payload must be Bytes".into()),
1469 },
1470 _ => return http_decode_err("HttpRequest.body must be Option[Bytes]".into()),
1471 };
1472 let timeout_ms = match req.get("timeout_ms") {
1473 Some(Value::Variant { name, .. }) if name == "None" => None,
1474 Some(Value::Variant { name, args }) if name == "Some" => match args.as_slice() {
1475 [Value::Int(n)] if *n >= 0 => Some(*n as u64),
1476 _ => return http_decode_err(
1477 "HttpRequest.timeout_ms Some payload must be a non-negative Int".into()),
1478 },
1479 _ => return http_decode_err("HttpRequest.timeout_ms must be Option[Int]".into()),
1480 };
1481 let headers: Vec<(String, String)> = match req.get("headers") {
1482 Some(Value::Map(m)) => m.iter().filter_map(|(k, v)| {
1483 let kk = match k { lex_bytecode::MapKey::Str(s) => s.clone(), _ => return None };
1484 let vv = match v { Value::Str(s) => s.clone(), _ => return None };
1485 Some((kk, vv))
1486 }).collect(),
1487 _ => return http_decode_err("HttpRequest.headers must be Map[Str, Str]".into()),
1488 };
1489 http_send_full(&method, &url, body, "", &headers, timeout_ms)
1490}
1491
1492fn expect_record(v: Option<&Value>) -> Result<&indexmap::IndexMap<String, Value>, String> {
1493 match v {
1494 Some(Value::Record(r)) => Ok(r),
1495 Some(other) => Err(format!("expected Record, got {other:?}")),
1496 None => Err("missing Record argument".into()),
1497 }
1498}
1499
1500fn err_value(msg: String) -> Value {
1501 Value::Variant { name: "Err".into(), args: vec![Value::Str(msg)] }
1502}
1503
1504fn expect_str(v: Option<&Value>) -> Result<&str, String> {
1505 match v {
1506 Some(Value::Str(s)) => Ok(s),
1507 Some(other) => Err(format!("expected Str arg, got {other:?}")),
1508 None => Err("missing argument".into()),
1509 }
1510}
1511
1512fn expect_int(v: Option<&Value>) -> Result<i64, String> {
1513 match v {
1514 Some(Value::Int(n)) => Ok(*n),
1515 Some(other) => Err(format!("expected Int arg, got {other:?}")),
1516 None => Err("missing argument".into()),
1517 }
1518}
1519
1520fn ok(v: Value) -> Value {
1521 Value::Variant { name: "Ok".into(), args: vec![v] }
1522}
1523fn err(v: Value) -> Value {
1524 Value::Variant { name: "Err".into(), args: vec![v] }
1525}
1526
1527impl DefaultHandler {
1528 fn dispatch_call_mcp(&mut self, args: Vec<Value>) -> Value {
1534 let server = match args.first() {
1535 Some(Value::Str(s)) => s.clone(),
1536 _ => return err(Value::Str(
1537 "agent.call_mcp(server, tool, args_json): server must be Str".into())),
1538 };
1539 let tool = match args.get(1) {
1540 Some(Value::Str(s)) => s.clone(),
1541 _ => return err(Value::Str(
1542 "agent.call_mcp(server, tool, args_json): tool must be Str".into())),
1543 };
1544 let args_json = match args.get(2) {
1545 Some(Value::Str(s)) => s.clone(),
1546 _ => return err(Value::Str(
1547 "agent.call_mcp(server, tool, args_json): args_json must be Str".into())),
1548 };
1549 let parsed: serde_json::Value = match serde_json::from_str(&args_json) {
1550 Ok(v) => v,
1551 Err(e) => return err(Value::Str(format!(
1552 "agent.call_mcp: args_json is not valid JSON: {e}"))),
1553 };
1554 match self.mcp_clients.call(&server, &tool, parsed) {
1555 Ok(result) => ok(Value::Str(
1556 serde_json::to_string(&result).unwrap_or_else(|_| "null".into()))),
1557 Err(e) => err(Value::Str(e)),
1558 }
1559 }
1560
1561 fn dispatch_cloud_stream(&mut self, args: Vec<Value>) -> Value {
1567 let _prompt = match args.first() {
1568 Some(Value::Str(s)) => s.clone(),
1569 _ => return err(Value::Str(
1570 "agent.cloud_stream(prompt): prompt must be Str".into())),
1571 };
1572 let chunks: Vec<String> = match std::env::var("LEX_LLM_STREAM_FIXTURE") {
1573 Ok(v) => v.split('|').map(|s| s.to_string()).collect(),
1574 Err(_) => return err(Value::Str(
1575 "agent.cloud_stream: live streaming not yet implemented; \
1576 set LEX_LLM_STREAM_FIXTURE='chunk1|chunk2|…' for tests".into())),
1577 };
1578 let handle = self.register_stream(chunks.into_iter());
1579 ok(stream_handle_value(handle))
1580 }
1581
1582 fn dispatch_stream_next(&mut self, args: Vec<Value>) -> Value {
1588 let handle = match args.first().and_then(stream_handle_id) {
1589 Some(h) => h,
1590 None => return Value::Variant { name: "None".into(), args: vec![] },
1591 };
1592 let mut streams = match self.streams.lock() {
1593 Ok(g) => g,
1594 Err(_) => return Value::Variant { name: "None".into(), args: vec![] },
1595 };
1596 match streams.get_mut(&handle).and_then(|it| it.next()) {
1597 Some(chunk) => some(Value::Str(chunk)),
1598 None => {
1599 streams.remove(&handle);
1600 Value::Variant { name: "None".into(), args: vec![] }
1601 }
1602 }
1603 }
1604
1605 fn dispatch_stream_collect(&mut self, args: Vec<Value>) -> Value {
1610 let handle = match args.first().and_then(stream_handle_id) {
1611 Some(h) => h,
1612 None => return Value::List(Vec::new()),
1613 };
1614 let mut iter = {
1615 let mut streams = match self.streams.lock() {
1616 Ok(g) => g,
1617 Err(_) => return Value::List(Vec::new()),
1618 };
1619 match streams.remove(&handle) {
1620 Some(it) => it,
1621 None => return Value::List(Vec::new()),
1622 }
1623 };
1624 let mut out: Vec<Value> = Vec::new();
1625 for chunk in iter.by_ref() {
1626 out.push(Value::Str(chunk));
1627 }
1628 Value::List(out)
1629 }
1630
1631 fn register_stream<I>(&self, iter: I) -> String
1635 where
1636 I: Iterator<Item = String> + Send + 'static,
1637 {
1638 let id = self
1639 .next_stream_id
1640 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1641 let handle = format!("stream_{id}");
1642 if let Ok(mut streams) = self.streams.lock() {
1643 streams.insert(handle.clone(), Box::new(iter));
1644 }
1645 handle
1646 }
1647}
1648
1649fn stream_handle_value(handle: String) -> Value {
1654 Value::Variant {
1655 name: "__StreamHandle".into(),
1656 args: vec![Value::Str(handle)],
1657 }
1658}
1659
1660fn stream_handle_id(v: &Value) -> Option<String> {
1664 match v {
1665 Value::Variant { name, args } if name == "__StreamHandle" => match args.first() {
1666 Some(Value::Str(h)) => Some(h.clone()),
1667 _ => None,
1668 },
1669 _ => None,
1670 }
1671}
1672
1673fn dispatch_llm_local(args: Vec<Value>) -> Value {
1678 let prompt = match args.first() {
1679 Some(Value::Str(s)) => s.clone(),
1680 _ => return err(Value::Str(
1681 "agent.local_complete(prompt): prompt must be Str".into())),
1682 };
1683 match crate::llm::local_complete(&prompt) {
1684 Ok(text) => ok(Value::Str(text)),
1685 Err(e) => err(Value::Str(e)),
1686 }
1687}
1688
1689fn dispatch_llm_cloud(args: Vec<Value>) -> Value {
1696 let prompt = match args.first() {
1697 Some(Value::Str(s)) => s.clone(),
1698 _ => return err(Value::Str(
1699 "agent.cloud_complete(prompt): prompt must be Str".into())),
1700 };
1701 match crate::llm::cloud_complete(&prompt) {
1702 Ok(text) => ok(Value::Str(text)),
1703 Err(e) => err(Value::Str(e)),
1704 }
1705}
1706
1707fn some(v: Value) -> Value {
1708 Value::Variant { name: "Some".into(), args: vec![v] }
1709}
1710fn none() -> Value {
1711 Value::Variant { name: "None".into(), args: vec![] }
1712}
1713
1714fn expect_bytes(v: Option<&Value>) -> Result<&Vec<u8>, String> {
1715 match v {
1716 Some(Value::Bytes(b)) => Ok(b),
1717 Some(other) => Err(format!("expected Bytes arg, got {other:?}")),
1718 None => Err("missing argument".into()),
1719 }
1720}
1721
1722fn expect_kv_handle(v: Option<&Value>) -> Result<u64, String> {
1723 match v {
1724 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1725 Some(other) => Err(format!("expected Kv handle (Int), got {other:?}")),
1726 None => Err("missing Kv argument".into()),
1727 }
1728}
1729
1730fn expect_sql_handle(v: Option<&Value>) -> Result<u64, String> {
1731 match v {
1732 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
1733 Some(other) => Err(format!("expected Db handle (Int), got {other:?}")),
1734 None => Err("missing Db argument".into()),
1735 }
1736}
1737
1738fn expect_str_list(v: Option<&Value>) -> Result<Vec<String>, String> {
1739 match v {
1740 Some(Value::List(items)) => items.iter().map(|x| match x {
1741 Value::Str(s) => Ok(s.clone()),
1742 other => Err(format!("expected List[Str] element, got {other:?}")),
1743 }).collect(),
1744 Some(other) => Err(format!("expected List[Str], got {other:?}")),
1745 None => Err("missing List[Str] argument".into()),
1746 }
1747}
1748
1749fn sql_run_query(
1755 conn: &rusqlite::Connection,
1756 stmt_str: &str,
1757 params: &[String],
1758) -> Value {
1759 let mut stmt = match conn.prepare(stmt_str) {
1760 Ok(s) => s,
1761 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1762 };
1763 let column_count = stmt.column_count();
1764 let column_names: Vec<String> = (0..column_count)
1765 .map(|i| stmt.column_name(i).unwrap_or("").to_string())
1766 .collect();
1767 let bind: Vec<&dyn rusqlite::ToSql> = params.iter()
1768 .map(|s| s as &dyn rusqlite::ToSql)
1769 .collect();
1770 let mut rows = match stmt.query(rusqlite::params_from_iter(bind.iter())) {
1771 Ok(r) => r,
1772 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1773 };
1774 let mut out: Vec<Value> = Vec::new();
1775 loop {
1776 let row = match rows.next() {
1777 Ok(Some(r)) => r,
1778 Ok(None) => break,
1779 Err(e) => return err(Value::Str(format!("sql.query: {e}"))),
1780 };
1781 let mut rec = indexmap::IndexMap::new();
1782 for (i, name) in column_names.iter().enumerate() {
1783 let cell = match row.get_ref(i) {
1784 Ok(c) => sql_value_ref_to_lex(c),
1785 Err(e) => return err(Value::Str(format!("sql.query: column {i}: {e}"))),
1786 };
1787 rec.insert(name.clone(), cell);
1788 }
1789 out.push(Value::Record(rec));
1790 }
1791 ok(Value::List(out))
1792}
1793
1794fn sql_value_ref_to_lex(v: rusqlite::types::ValueRef<'_>) -> Value {
1795 use rusqlite::types::ValueRef;
1796 match v {
1797 ValueRef::Null => Value::Unit,
1798 ValueRef::Integer(n) => Value::Int(n),
1799 ValueRef::Real(f) => Value::Float(f),
1800 ValueRef::Text(s) => Value::Str(String::from_utf8_lossy(s).into_owned()),
1801 ValueRef::Blob(b) => Value::Bytes(b.to_vec()),
1802 }
1803}
1804
1805#[derive(Clone, Copy, PartialEq, PartialOrd)]
1808enum LogLevel { Debug, Info, Warn, Error }
1809
1810#[derive(Clone, Copy, PartialEq)]
1811enum LogFormat { Text, Json }
1812
1813#[derive(Clone)]
1814enum LogSink {
1815 Stderr,
1816 File(std::sync::Arc<Mutex<std::fs::File>>),
1817}
1818
1819struct LogState {
1820 level: LogLevel,
1821 format: LogFormat,
1822 sink: LogSink,
1823}
1824
1825fn log_state() -> &'static Mutex<LogState> {
1826 static STATE: OnceLock<Mutex<LogState>> = OnceLock::new();
1827 STATE.get_or_init(|| Mutex::new(LogState {
1828 level: LogLevel::Info,
1829 format: LogFormat::Text,
1830 sink: LogSink::Stderr,
1831 }))
1832}
1833
1834fn parse_log_level(s: &str) -> Option<LogLevel> {
1835 match s {
1836 "debug" => Some(LogLevel::Debug),
1837 "info" => Some(LogLevel::Info),
1838 "warn" => Some(LogLevel::Warn),
1839 "error" => Some(LogLevel::Error),
1840 _ => None,
1841 }
1842}
1843
1844fn level_label(l: LogLevel) -> &'static str {
1845 match l {
1846 LogLevel::Debug => "debug",
1847 LogLevel::Info => "info",
1848 LogLevel::Warn => "warn",
1849 LogLevel::Error => "error",
1850 }
1851}
1852
1853fn emit_log(level: LogLevel, msg: &str) {
1854 let state = log_state().lock().unwrap();
1855 if level < state.level {
1856 return;
1857 }
1858 let ts = chrono::Utc::now().to_rfc3339();
1859 let line = match state.format {
1860 LogFormat::Text => format!("[{}] {}: {}\n", ts, level_label(level), msg),
1861 LogFormat::Json => {
1862 let escaped = msg
1866 .replace('\\', "\\\\")
1867 .replace('"', "\\\"")
1868 .replace('\n', "\\n")
1869 .replace('\r', "\\r");
1870 format!(
1871 "{{\"ts\":\"{ts}\",\"level\":\"{}\",\"msg\":\"{escaped}\"}}\n",
1872 level_label(level),
1873 )
1874 }
1875 };
1876 let sink = state.sink.clone();
1877 drop(state);
1878 match sink {
1879 LogSink::Stderr => {
1880 use std::io::Write;
1881 let _ = std::io::stderr().write_all(line.as_bytes());
1882 }
1883 LogSink::File(f) => {
1884 use std::io::Write;
1885 if let Ok(mut g) = f.lock() {
1886 let _ = g.write_all(line.as_bytes());
1887 }
1888 }
1889 }
1890}
1891
1892pub(crate) struct ProcessState {
1893 child: std::process::Child,
1894 stdout: Option<std::io::BufReader<std::process::ChildStdout>>,
1895 stderr: Option<std::io::BufReader<std::process::ChildStderr>>,
1896}
1897
1898fn process_registry() -> &'static Mutex<ProcessRegistry> {
1912 static REGISTRY: OnceLock<Mutex<ProcessRegistry>> = OnceLock::new();
1913 REGISTRY.get_or_init(|| Mutex::new(ProcessRegistry::with_capacity(MAX_PROCESS_HANDLES)))
1914}
1915
1916const MAX_PROCESS_HANDLES: usize = 256;
1917
1918type SharedProcessState = Arc<Mutex<ProcessState>>;
1919
1920pub(crate) struct ProcessRegistry {
1921 entries: indexmap::IndexMap<u64, SharedProcessState>,
1922 cap: usize,
1923}
1924
1925impl ProcessRegistry {
1926 pub(crate) fn with_capacity(cap: usize) -> Self {
1927 Self { entries: indexmap::IndexMap::new(), cap }
1928 }
1929
1930 pub(crate) fn insert(&mut self, handle: u64, state: ProcessState) {
1934 if self.entries.len() >= self.cap {
1935 self.entries.shift_remove_index(0);
1936 }
1937 self.entries.insert(handle, Arc::new(Mutex::new(state)));
1938 }
1939
1940 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedProcessState> {
1944 let idx = self.entries.get_index_of(&handle)?;
1945 self.entries.move_index(idx, self.entries.len() - 1);
1946 self.entries.get(&handle).cloned()
1947 }
1948
1949 pub(crate) fn remove(&mut self, handle: u64) {
1954 self.entries.shift_remove(&handle);
1955 }
1956
1957 #[cfg(test)]
1958 pub(crate) fn len(&self) -> usize { self.entries.len() }
1959}
1960
1961fn next_process_handle() -> u64 {
1962 static COUNTER: AtomicU64 = AtomicU64::new(1);
1963 COUNTER.fetch_add(1, Ordering::SeqCst)
1964}
1965
1966#[cfg(all(test, unix))]
1967mod process_registry_tests {
1968 use super::{ProcessRegistry, ProcessState};
1969
1970 fn fresh_state() -> ProcessState {
1974 let child = std::process::Command::new("true")
1975 .stdout(std::process::Stdio::null())
1976 .stderr(std::process::Stdio::null())
1977 .spawn()
1978 .expect("spawn `true`");
1979 ProcessState { child, stdout: None, stderr: None }
1980 }
1981
1982 #[test]
1983 fn insert_and_get_round_trip() {
1984 let mut r = ProcessRegistry::with_capacity(4);
1985 r.insert(1, fresh_state());
1986 assert!(r.touch_get(1).is_some());
1987 assert!(r.touch_get(2).is_none());
1988 }
1989
1990 #[test]
1991 fn touch_get_returns_distinct_arcs_for_distinct_handles() {
1992 let mut r = ProcessRegistry::with_capacity(4);
1993 r.insert(1, fresh_state());
1994 r.insert(2, fresh_state());
1995 let a = r.touch_get(1).unwrap();
1996 let b = r.touch_get(2).unwrap();
1997 assert!(!std::sync::Arc::ptr_eq(&a, &b));
1999 }
2000
2001 #[test]
2002 fn cap_evicts_lru_on_overflow() {
2003 let mut r = ProcessRegistry::with_capacity(2);
2004 r.insert(1, fresh_state());
2005 r.insert(2, fresh_state());
2006 let _ = r.touch_get(1);
2007 r.insert(3, fresh_state());
2008 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2009 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2010 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2011 assert_eq!(r.len(), 2);
2012 }
2013
2014 #[test]
2015 fn cap_with_no_touches_evicts_in_insertion_order() {
2016 let mut r = ProcessRegistry::with_capacity(2);
2017 r.insert(10, fresh_state());
2018 r.insert(20, fresh_state());
2019 r.insert(30, fresh_state());
2020 assert!(r.touch_get(10).is_none());
2021 assert!(r.touch_get(20).is_some());
2022 assert!(r.touch_get(30).is_some());
2023 }
2024
2025 #[test]
2026 fn remove_drops_entry() {
2027 let mut r = ProcessRegistry::with_capacity(4);
2028 r.insert(1, fresh_state());
2029 r.remove(1);
2030 assert!(r.touch_get(1).is_none());
2031 assert_eq!(r.len(), 0);
2032 }
2033
2034 #[test]
2035 fn many_inserts_stay_bounded_at_cap() {
2036 let cap = 8;
2037 let mut r = ProcessRegistry::with_capacity(cap);
2038 for i in 0..(cap as u64 * 3) {
2039 r.insert(i, fresh_state());
2040 assert!(r.len() <= cap);
2041 }
2042 assert_eq!(r.len(), cap);
2043 }
2044
2045 #[test]
2046 fn outstanding_arc_outlives_remove() {
2047 let mut r = ProcessRegistry::with_capacity(4);
2051 r.insert(1, fresh_state());
2052 let arc = r.touch_get(1).expect("entry exists");
2053 r.remove(1);
2054 assert!(r.touch_get(1).is_none());
2056 let _state = arc.lock().unwrap();
2057 }
2058}
2059
2060fn expect_process_handle(v: Option<&Value>) -> Result<u64, String> {
2061 match v {
2062 Some(Value::Int(n)) if *n >= 0 => Ok(*n as u64),
2063 Some(other) => Err(format!("expected ProcessHandle (Int), got {other:?}")),
2064 None => Err("missing ProcessHandle argument".into()),
2065 }
2066}
2067
2068fn kv_registry() -> &'static Mutex<KvRegistry> {
2080 static REGISTRY: OnceLock<Mutex<KvRegistry>> = OnceLock::new();
2081 REGISTRY.get_or_init(|| Mutex::new(KvRegistry::with_capacity(MAX_KV_HANDLES)))
2082}
2083
2084const MAX_KV_HANDLES: usize = 256;
2090
2091pub(crate) struct KvRegistry {
2096 entries: indexmap::IndexMap<u64, sled::Db>,
2097 cap: usize,
2098}
2099
2100impl KvRegistry {
2101 pub(crate) fn with_capacity(cap: usize) -> Self {
2102 Self { entries: indexmap::IndexMap::new(), cap }
2103 }
2104
2105 pub(crate) fn insert(&mut self, handle: u64, db: sled::Db) {
2108 if self.entries.len() >= self.cap {
2109 self.entries.shift_remove_index(0);
2110 }
2111 self.entries.insert(handle, db);
2112 }
2113
2114 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<&sled::Db> {
2116 let idx = self.entries.get_index_of(&handle)?;
2117 self.entries.move_index(idx, self.entries.len() - 1);
2118 self.entries.get(&handle)
2119 }
2120
2121 pub(crate) fn remove(&mut self, handle: u64) {
2123 self.entries.shift_remove(&handle);
2124 }
2125
2126 #[cfg(test)]
2127 pub(crate) fn len(&self) -> usize { self.entries.len() }
2128}
2129
2130fn next_kv_handle() -> u64 {
2131 static COUNTER: AtomicU64 = AtomicU64::new(1);
2132 COUNTER.fetch_add(1, Ordering::SeqCst)
2133}
2134
2135fn sql_registry() -> &'static Mutex<SqlRegistry> {
2142 static REGISTRY: OnceLock<Mutex<SqlRegistry>> = OnceLock::new();
2143 REGISTRY.get_or_init(|| Mutex::new(SqlRegistry::with_capacity(MAX_SQL_HANDLES)))
2144}
2145
2146const MAX_SQL_HANDLES: usize = 256;
2147
2148type SharedConn = Arc<Mutex<rusqlite::Connection>>;
2149
2150pub(crate) struct SqlRegistry {
2151 entries: indexmap::IndexMap<u64, SharedConn>,
2152 cap: usize,
2153}
2154
2155impl SqlRegistry {
2156 pub(crate) fn with_capacity(cap: usize) -> Self {
2157 Self { entries: indexmap::IndexMap::new(), cap }
2158 }
2159
2160 pub(crate) fn insert(&mut self, handle: u64, conn: rusqlite::Connection) {
2161 if self.entries.len() >= self.cap {
2162 self.entries.shift_remove_index(0);
2163 }
2164 self.entries.insert(handle, Arc::new(Mutex::new(conn)));
2165 }
2166
2167 pub(crate) fn touch_get(&mut self, handle: u64) -> Option<SharedConn> {
2171 let idx = self.entries.get_index_of(&handle)?;
2172 self.entries.move_index(idx, self.entries.len() - 1);
2173 self.entries.get(&handle).cloned()
2174 }
2175
2176 pub(crate) fn remove(&mut self, handle: u64) {
2177 self.entries.shift_remove(&handle);
2178 }
2179
2180 #[cfg(test)]
2181 pub(crate) fn len(&self) -> usize { self.entries.len() }
2182}
2183
2184fn next_sql_handle() -> u64 {
2185 static COUNTER: AtomicU64 = AtomicU64::new(1);
2186 COUNTER.fetch_add(1, Ordering::SeqCst)
2187}
2188
2189#[cfg(test)]
2190mod sql_registry_tests {
2191 use super::SqlRegistry;
2192
2193 fn fresh() -> rusqlite::Connection {
2194 rusqlite::Connection::open_in_memory().expect("open in-memory sqlite")
2195 }
2196
2197 #[test]
2198 fn insert_and_get_round_trip() {
2199 let mut r = SqlRegistry::with_capacity(4);
2200 r.insert(1, fresh());
2201 assert!(r.touch_get(1).is_some());
2202 assert!(r.touch_get(2).is_none());
2203 }
2204
2205 #[test]
2206 fn cap_evicts_lru_on_overflow() {
2207 let mut r = SqlRegistry::with_capacity(2);
2208 r.insert(1, fresh());
2209 r.insert(2, fresh());
2210 let _ = r.touch_get(1);
2211 r.insert(3, fresh());
2212 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2213 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2214 assert!(r.touch_get(3).is_some(), "3 just inserted");
2215 assert_eq!(r.len(), 2);
2216 }
2217
2218 #[test]
2219 fn remove_drops_entry() {
2220 let mut r = SqlRegistry::with_capacity(4);
2221 r.insert(1, fresh());
2222 r.remove(1);
2223 assert!(r.touch_get(1).is_none());
2224 assert_eq!(r.len(), 0);
2225 }
2226
2227 #[test]
2228 fn many_inserts_stay_bounded_at_cap() {
2229 let cap = 8;
2230 let mut r = SqlRegistry::with_capacity(cap);
2231 for i in 0..(cap as u64 * 3) {
2232 r.insert(i, fresh());
2233 assert!(r.len() <= cap);
2234 }
2235 assert_eq!(r.len(), cap);
2236 }
2237}
2238
2239#[cfg(test)]
2240mod kv_registry_tests {
2241 use super::KvRegistry;
2242
2243 fn fresh_db(tag: &str) -> sled::Db {
2246 let dir = std::env::temp_dir().join(format!(
2247 "lex-kv-reg-{}-{}-{}",
2248 std::process::id(),
2249 tag,
2250 std::time::SystemTime::now()
2251 .duration_since(std::time::UNIX_EPOCH)
2252 .unwrap()
2253 .as_nanos()
2254 ));
2255 sled::open(&dir).expect("sled open")
2256 }
2257
2258 #[test]
2259 fn insert_and_get_round_trip() {
2260 let mut r = KvRegistry::with_capacity(4);
2261 r.insert(1, fresh_db("a"));
2262 assert!(r.touch_get(1).is_some());
2263 assert!(r.touch_get(2).is_none());
2264 }
2265
2266 #[test]
2267 fn cap_evicts_lru_on_overflow() {
2268 let mut r = KvRegistry::with_capacity(2);
2270 r.insert(1, fresh_db("c1"));
2271 r.insert(2, fresh_db("c2"));
2272 let _ = r.touch_get(1);
2273 r.insert(3, fresh_db("c3"));
2274 assert!(r.touch_get(1).is_some(), "1 was MRU, should survive");
2275 assert!(r.touch_get(2).is_none(), "2 was LRU, should be evicted");
2276 assert!(r.touch_get(3).is_some(), "3 just inserted, should survive");
2277 assert_eq!(r.len(), 2);
2278 }
2279
2280 #[test]
2281 fn cap_with_no_touches_evicts_in_insertion_order() {
2282 let mut r = KvRegistry::with_capacity(2);
2284 r.insert(10, fresh_db("f1"));
2285 r.insert(20, fresh_db("f2"));
2286 r.insert(30, fresh_db("f3"));
2287 assert!(r.touch_get(10).is_none());
2288 assert!(r.touch_get(20).is_some());
2289 assert!(r.touch_get(30).is_some());
2290 }
2291
2292 #[test]
2293 fn remove_drops_entry() {
2294 let mut r = KvRegistry::with_capacity(4);
2295 r.insert(1, fresh_db("r1"));
2296 r.remove(1);
2297 assert!(r.touch_get(1).is_none());
2298 assert_eq!(r.len(), 0);
2299 }
2300
2301 #[test]
2302 fn remove_unknown_handle_is_noop() {
2303 let mut r = KvRegistry::with_capacity(4);
2304 r.insert(1, fresh_db("u1"));
2305 r.remove(999);
2306 assert!(r.touch_get(1).is_some());
2307 }
2308
2309 #[test]
2310 fn many_inserts_stay_bounded_at_cap() {
2311 let cap = 8;
2314 let mut r = KvRegistry::with_capacity(cap);
2315 for i in 0..(cap as u64 * 3) {
2316 r.insert(i, fresh_db(&format!("b{i}")));
2317 assert!(r.len() <= cap);
2318 }
2319 assert_eq!(r.len(), cap);
2320 }
2321}