Skip to main content

lex_bytecode/
vm.rs

1//! M5: bytecode VM. Stack machine with effect dispatch through a host handler.
2
3use crate::op::*;
4use crate::program::*;
5use crate::value::{ActorCell, Value};
6use std::sync::{Arc, Mutex};
7use indexmap::IndexMap;
8use std::collections::VecDeque;
9
10#[derive(Debug, Clone, thiserror::Error)]
11pub enum VmError {
12    #[error("runtime panic: {0}")]
13    Panic(String),
14    #[error("type mismatch at runtime: {0}")]
15    TypeMismatch(String),
16    #[error("stack underflow")]
17    StackUnderflow,
18    #[error("unknown function: {0}")]
19    UnknownFunction(String),
20    #[error("effect handler error: {0}")]
21    Effect(String),
22    #[error("call stack overflow: recursion depth exceeded ({0})")]
23    CallStackOverflow(u32),
24    /// Refinement predicate failed at a call boundary (#209 slice 3).
25    /// Surfaced when a function declares `param :: Type{x | predicate}`,
26    /// the call-site arg couldn't be discharged statically (slice 2),
27    /// and the runtime evaluator finds the predicate is `false` for
28    /// the actual argument value. The `verdict` mirrors the shape of
29    /// `gate.verdict`-style records in `lex-trace`.
30    #[error("refinement violated: argument {param_index} of `{fn_name}` (binding `{binding}`): {reason}")]
31    RefinementFailed {
32        fn_name: String,
33        param_index: usize,
34        binding: String,
35        reason: String,
36    },
37}
38
39/// Maximum simultaneous call frames. Defends against unbounded
40/// recursion in agent-emitted code: a body that calls itself
41/// without a base case would otherwise blow the host's native
42/// stack and crash the process. Real Lex code rarely exceeds
43/// ~30 frames; 1024 is generous headroom while still well under
44/// the OS stack limit at any per-frame size we use.
45pub const MAX_CALL_DEPTH: u32 = 1024;
46
47/// Host-side effect dispatch. Implementors decide what `kind`/`op` mean
48/// and how arguments map to side effects.
49pub trait EffectHandler {
50    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String>;
51
52    /// Hook called by the VM at every function call so handlers can
53    /// enforce per-call budget consumption (#225). The argument is
54    /// the sum of `[budget(N)]` declared on the callee's signature;
55    /// the handler returns `Err` to refuse the call (the VM converts
56    /// to `VmError::Effect`). Default impl is a no-op so legacy
57    /// handlers and pure-only runs are unaffected.
58    fn note_call_budget(&mut self, _budget_cost: u64) -> Result<(), String> {
59        Ok(())
60    }
61
62    /// `list.par_map` worker-handler factory (#305 slice 2).
63    ///
64    /// Each parallel worker thread runs its own `Vm` and therefore
65    /// needs its own effect handler. The parent handler may opt in
66    /// to per-worker dispatch by returning `Some(handler)` here;
67    /// returning `None` (the default) keeps slice-1 behavior: the
68    /// worker runs `DenyAllEffects` and any effect call inside the
69    /// closure fails with `VmError::Effect`.
70    ///
71    /// The returned handler must be `Send` so the worker can take
72    /// ownership across a thread boundary. Shared state (budget
73    /// pool, chat registry, etc.) is wired up by the implementer.
74    /// Per-worker independence (MCP client cache, output sink)
75    /// is intentional — the alternative is mutex-serialization of
76    /// the whole effect dispatch, which would defeat the parallelism.
77    fn spawn_for_worker(&self) -> Option<Box<dyn EffectHandler + Send>> {
78        None
79    }
80}
81
82/// `Vm` exposes itself as a `ClosureCaller` so the parser interpreter
83/// can invoke user-supplied closures during a `parser.run` walk
84/// (#221). The Vm is reentrant for closure invocation: pushing a new
85/// frame onto an active call stack is supported, and the handler
86/// stays in place so any effects the closure body fires dispatch
87/// normally.
88impl<'a> crate::parser_runtime::ClosureCaller for Vm<'a> {
89    fn call_closure(&mut self, closure: Value, args: Vec<Value>) -> Result<Value, String> {
90        self.invoke_closure_value(closure, args)
91            .map_err(|e| format!("{e:?}"))
92    }
93}
94
95/// A handler that fails any effect call. Useful as a default for pure-only runs.
96pub struct DenyAllEffects;
97impl EffectHandler for DenyAllEffects {
98    fn dispatch(&mut self, kind: &str, op: &str, _args: Vec<Value>) -> Result<Value, String> {
99        Err(format!("effects not permitted (attempted {kind}.{op})"))
100    }
101}
102
103/// Trace receiver. Implementors record the call/effect tree and may
104/// substitute effect responses (for replay).
105pub trait Tracer {
106    fn enter_call(&mut self, node_id: &str, name: &str, args: &[Value]);
107    fn enter_effect(&mut self, node_id: &str, kind: &str, op: &str, args: &[Value]);
108    fn exit_ok(&mut self, value: &Value);
109    fn exit_err(&mut self, message: &str);
110    /// Tail-call optimization: pop the current frame's open call without
111    /// re-entering the parent (the new call takes its place).
112    fn exit_call_tail(&mut self);
113    /// During replay, return Some(v) to substitute an effect's output.
114    fn override_effect(&mut self, _node_id: &str) -> Option<Value> { None }
115}
116
117/// No-op tracer for normal execution.
118pub struct NullTracer;
119impl Tracer for NullTracer {
120    fn enter_call(&mut self, _: &str, _: &str, _: &[Value]) {}
121    fn enter_effect(&mut self, _: &str, _: &str, _: &str, _: &[Value]) {}
122    fn exit_ok(&mut self, _: &Value) {}
123    fn exit_err(&mut self, _: &str) {}
124    fn exit_call_tail(&mut self) {}
125}
126
127#[derive(Debug, Clone)]
128pub(crate) enum FrameKind {
129    /// Top-level entry frame; doesn't correspond to a Call opcode.
130    Entry,
131    /// Frame opened by Call/TailCall. The `String` is the originating
132    /// `NodeId`; useful for diagnostics even if currently unread.
133    Call(#[allow(dead_code)] String),
134}
135
136pub struct Vm<'a> {
137    program: &'a Program,
138    handler: Box<dyn EffectHandler + 'a>,
139    pub(crate) tracer: Box<dyn Tracer + 'a>,
140    /// Per-call frames. Each frame has its own locals array and pc.
141    frames: Vec<Frame>,
142    stack: Vec<Value>,
143    /// Soft cap to avoid runaway computations in tests.
144    pub step_limit: u64,
145    pub steps: u64,
146    /// Per-Vm memoization cache for pure functions (#229). Keyed by
147    /// `(fn_id, sha256(canonical_json(args))[..16])`. Effectful
148    /// functions never enter this map. The cache lives for the
149    /// lifetime of one `Vm::call` chain — calling `Vm::with_handler`
150    /// again starts a fresh cache.
151    pure_memo: std::collections::HashMap<(u32, [u8; 16]), Value>,
152    /// Diagnostic counters for `--trace` observability (#229).
153    pub pure_memo_hits: u64,
154    pub pure_memo_misses: u64,
155    /// Monomorphic inline caches for `Op::GetField` (#389 slice 2).
156    /// Key: `(fn_id as u64) << 32 | pc as u64` — one entry per
157    /// call site. Value: the IndexMap offset of the field at that
158    /// site. On a hit we verify the field name at the cached offset
159    /// in O(1) and skip the hash-table lookup entirely; on a miss we
160    /// fall back to `IndexMap::get_full` and populate the cache.
161    field_ics: std::collections::HashMap<u64, usize>,
162    /// Stack allocator for function locals (#389 slice 3).
163    ///
164    /// Every function frame claims `locals_count` contiguous slots from
165    /// this Vec on push and releases them on pop.  Because Lex uses
166    /// strictly LIFO frame semantics the most-recently-pushed frame's
167    /// slots always sit at the top of the Vec, so `truncate` is the
168    /// correct (and O(1)) release operation.
169    ///
170    /// The Vec is pre-allocated once at VM construction and then grows
171    /// only if the actual call depth × locals width exceeds the initial
172    /// capacity.  After a top-level `vm.call` returns the Vec is empty
173    /// again but its capacity is retained, so the next request incurs
174    /// zero allocations for locals up to the high-water mark.
175    locals_storage: Vec<Value>,
176}
177
178struct Frame {
179    fn_id: u32,
180    pc: usize,
181    /// Start index of this frame's locals in `Vm::locals_storage` (#389
182    /// slice 3). The frame owns `locals_storage[locals_start..locals_start
183    /// + locals_len]`; `Op::Return` truncates the Vec back to
184    /// `locals_start`, releasing the slots in O(1).
185    locals_start: usize,
186    locals_len: usize,
187    /// Stack base when this frame started (for cleanup on return).
188    stack_base: usize,
189    trace_kind: FrameKind,
190    /// Pure-fn memo key (#229). `Some(key)` if the call was eligible
191    /// for memoization and missed the cache; on Op::Return the key
192    /// is used to write the return value back into the cache.
193    /// `None` means "don't memoize" — either the function isn't pure,
194    /// the call wasn't through Op::Call, or memoization is disabled.
195    memo_key: Option<(u32, [u8; 16])>,
196}
197
198/// Sum of `[budget(N)]` declarations on a function's signature
199/// (#225). Used by Op::Call / Op::TailCall / Op::CallClosure to
200/// notify the EffectHandler of per-call budget cost so the handler
201/// can deduct from a shared pool and refuse calls that would
202/// exceed the policy ceiling. Negative `Int` args are ignored —
203/// the static check (`policy::check_program`) treats budgets as
204/// non-negative.
205fn call_budget_cost(f: &crate::program::Function) -> u64 {
206    let mut total: u64 = 0;
207    for e in &f.effects {
208        if e.kind == "budget" {
209            if let Some(crate::program::EffectArg::Int(n)) = &e.arg {
210                if *n >= 0 {
211                    total = total.saturating_add(*n as u64);
212                }
213            }
214        }
215    }
216    total
217}
218
219/// Hash the argument list for a pure-fn memoization lookup (#229).
220/// Routes through the canonical-JSON path so two semantically-equal
221/// argument lists produce the same hash regardless of how the
222/// containing `Value`s were assembled (Records use IndexMap so field
223/// order is already stable, but canon_json gives the same property
224/// for the inner serde_json shape).
225fn hash_call_args(args: &[Value]) -> [u8; 16] {
226    use sha2::{Digest, Sha256};
227    let json = serde_json::Value::Array(args.iter().map(Value::to_json).collect());
228    let canonical = lex_ast::canon_json::hash_canonical(&json);
229    let mut hasher = Sha256::new();
230    hasher.update(canonical);
231    let full = hasher.finalize();
232    let mut out = [0u8; 16];
233    out.copy_from_slice(&full[..16]);
234    out
235}
236
237/// Evaluate a refinement predicate at runtime against the actual
238/// argument value (#209 slice 3). Mirrors `lex_types::discharge`'s
239/// static evaluator but operates on `Value` directly.
240///
241/// Returns `Ok(true)` / `Ok(false)` for a clean boolean verdict, or
242/// `Err(reason)` if the predicate references something the runtime
243/// can't resolve (free variable beyond the binding, unsupported AST
244/// node). Callers map `Ok(false)` and `Err` to `VmError::RefinementFailed`.
245fn eval_refinement(
246    predicate: &lex_ast::CExpr,
247    binding: &str,
248    arg: &Value,
249) -> Result<bool, String> {
250    match eval_refinement_inner(predicate, binding, arg) {
251        Ok(Value::Bool(b)) => Ok(b),
252        Ok(other) => Err(format!("predicate didn't reduce to a Bool, got {other:?}")),
253        Err(e) => Err(e),
254    }
255}
256
257fn eval_refinement_inner(
258    e: &lex_ast::CExpr,
259    binding: &str,
260    arg: &Value,
261) -> Result<Value, String> {
262    use lex_ast::{CExpr, CLit};
263    match e {
264        CExpr::Literal { value } => Ok(match value {
265            CLit::Int { value } => Value::Int(*value),
266            CLit::Float { value } => Value::Float(value.parse().unwrap_or(0.0)),
267            CLit::Bool { value } => Value::Bool(*value),
268            CLit::Str { value } => Value::Str(value.as_str().into()),
269            CLit::Bytes { value } => Value::Str(value.as_str().into()), // hex; unusual in predicates
270            CLit::Unit => Value::Unit,
271        }),
272        CExpr::Var { name } if name == binding => Ok(arg.clone()),
273        CExpr::Var { name } => Err(format!(
274            "predicate references free var `{name}`; runtime check \
275             only resolves the binding (slice 4 will plumb call-site \
276             context)")),
277        CExpr::UnaryOp { op, expr } => {
278            let v = eval_refinement_inner(expr, binding, arg)?;
279            match (op.as_str(), v) {
280                ("not", Value::Bool(b)) => Ok(Value::Bool(!b)),
281                ("-", Value::Int(n)) => Ok(Value::Int(-n)),
282                ("-", Value::Float(n)) => Ok(Value::Float(-n)),
283                (o, v) => Err(format!("unsupported unary `{o}` on {v:?}")),
284            }
285        }
286        CExpr::BinOp { op, lhs, rhs } => {
287            // Short-circuit `and` / `or` for the same reasons as the
288            // static evaluator.
289            if op == "and" || op == "or" {
290                let l = eval_refinement_inner(lhs, binding, arg)?;
291                let lb = match l {
292                    Value::Bool(b) => b,
293                    other => return Err(format!("`{op}` on non-bool: {other:?}")),
294                };
295                if op == "and" && !lb { return Ok(Value::Bool(false)); }
296                if op == "or"  &&  lb { return Ok(Value::Bool(true));  }
297                let r = eval_refinement_inner(rhs, binding, arg)?;
298                return match r {
299                    Value::Bool(b) => Ok(Value::Bool(b)),
300                    other => Err(format!("`{op}` on non-bool: {other:?}")),
301                };
302            }
303            let l = eval_refinement_inner(lhs, binding, arg)?;
304            let r = eval_refinement_inner(rhs, binding, arg)?;
305            apply_refinement_binop(op, &l, &r)
306        }
307        // Other AST forms (Call, Let, Match, FieldAccess, Lambda,
308        // Block, Constructors, Records, Tuples, Lists, Return) need
309        // a more general evaluator that can call back into the VM.
310        // Out of scope for slice 3; a future slice may unify this
311        // with the spec-checker's gate evaluator.
312        other => Err(format!("unsupported predicate node: {other:?}")),
313    }
314}
315
316fn apply_refinement_binop(op: &str, l: &Value, r: &Value) -> Result<Value, String> {
317    use Value::*;
318    match (op, l, r) {
319        ("+", Int(a), Int(b)) => Ok(Int(a + b)),
320        ("-", Int(a), Int(b)) => Ok(Int(a - b)),
321        ("*", Int(a), Int(b)) => Ok(Int(a * b)),
322        ("/", Int(a), Int(b)) if *b != 0 => Ok(Int(a / b)),
323        ("%", Int(a), Int(b)) if *b != 0 => Ok(Int(a % b)),
324        ("+", Float(a), Float(b)) => Ok(Float(a + b)),
325        ("-", Float(a), Float(b)) => Ok(Float(a - b)),
326        ("*", Float(a), Float(b)) => Ok(Float(a * b)),
327        ("/", Float(a), Float(b)) => Ok(Float(a / b)),
328
329        ("==", a, b) => Ok(Bool(a == b)),
330        ("!=", a, b) => Ok(Bool(a != b)),
331
332        ("<",  Int(a), Int(b)) => Ok(Bool(a < b)),
333        ("<=", Int(a), Int(b)) => Ok(Bool(a <= b)),
334        (">",  Int(a), Int(b)) => Ok(Bool(a > b)),
335        (">=", Int(a), Int(b)) => Ok(Bool(a >= b)),
336
337        ("<",  Float(a), Float(b)) => Ok(Bool(a < b)),
338        ("<=", Float(a), Float(b)) => Ok(Bool(a <= b)),
339        (">",  Float(a), Float(b)) => Ok(Bool(a > b)),
340        (">=", Float(a), Float(b)) => Ok(Bool(a >= b)),
341
342        (op, a, b) => Err(format!(
343            "unsupported binop `{op}` on {a:?} and {b:?}")),
344    }
345}
346
347fn const_str(constants: &[Const], idx: u32) -> String {
348    match constants.get(idx as usize) {
349        Some(Const::NodeId(s)) | Some(Const::Str(s)) => s.clone(),
350        _ => String::new(),
351    }
352}
353
354/// Read `LEX_PAR_MAX_CONCURRENCY` (default = available CPU cores,
355/// fallback 4). Capped at 64 so a malformed env var can't spawn an
356/// unreasonable number of OS threads.
357/// Order-defining comparator for `list.sort_by` keys (#338).
358/// Same-typed Int / Float / Str pairs compare via their native
359/// `Ord` / `PartialOrd`. Mixed-type or other key shapes compare
360/// as Equal; combined with `Vec::sort_by`'s stability that
361/// preserves the original element order — best-effort fallback
362/// that never panics.
363fn compare_sort_keys(a: &Value, b: &Value) -> std::cmp::Ordering {
364    use std::cmp::Ordering;
365    match (a, b) {
366        (Value::Int(x), Value::Int(y)) => x.cmp(y),
367        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
368        (Value::Str(x), Value::Str(y)) => x.cmp(y),
369        _ => Ordering::Equal,
370    }
371}
372
373fn par_max_concurrency() -> usize {
374    let from_env = std::env::var("LEX_PAR_MAX_CONCURRENCY")
375        .ok()
376        .and_then(|s| s.parse::<usize>().ok())
377        .filter(|n| *n > 0);
378    let default = std::thread::available_parallelism()
379        .map(|n| n.get())
380        .unwrap_or(4);
381    from_env.unwrap_or(default).min(64)
382}
383
384/// `list.par_map`'s runtime: spawn OS threads (capped by
385/// `LEX_PAR_MAX_CONCURRENCY`), apply `closure` to each item, return
386/// results in input order. Each worker runs a fresh `Vm` with
387/// [`DenyAllEffects`] for #305 slice 1 — effectful closures fail
388/// with `VmError::Effect`. Slice 2 will plumb a per-thread effect
389/// handler split.
390fn par_map_run<'a>(
391    program: &'a Program,
392    closure: Value,
393    items: Vec<Value>,
394    worker_handlers: Vec<Box<dyn EffectHandler + Send>>,
395) -> Result<Vec<Value>, VmError> {
396    if items.is_empty() {
397        return Ok(Vec::new());
398    }
399    let n_workers = worker_handlers.len().min(items.len()).max(1);
400    // Carve items into `n_workers` round-robin buckets so each
401    // worker processes (indices, items) pairs and we can reassemble
402    // in input order.
403    let mut buckets: Vec<Vec<(usize, Value)>> = (0..n_workers).map(|_| Vec::new()).collect();
404    for (i, v) in items.into_iter().enumerate() {
405        buckets[i % n_workers].push((i, v));
406    }
407    let n_total: usize = buckets.iter().map(|b| b.len()).sum();
408    let results: std::sync::Mutex<Vec<Option<Result<Value, String>>>> =
409        std::sync::Mutex::new((0..n_total).map(|_| None).collect());
410
411    // Pair each bucket with its pre-built handler so workers own
412    // their handler outright — no shared mutable state across
413    // worker threads.
414    let mut worker_handlers = worker_handlers;
415    worker_handlers.truncate(n_workers);
416    type Pair = (Vec<(usize, Value)>, Box<dyn EffectHandler + Send>);
417    let pairs: Vec<Pair> = buckets.into_iter().zip(worker_handlers).collect();
418
419    std::thread::scope(|s| {
420        let mut handles = Vec::with_capacity(pairs.len());
421        for (bucket, handler) in pairs {
422            let closure = closure.clone();
423            let results = &results;
424            handles.push(s.spawn(move || {
425                // `Box<dyn EffectHandler + Send>` has implicit
426                // `+ 'static`; that coerces to `+ 'a` because
427                // `'static` outlives any `'a`. The `Send` bound is
428                // auto-erased on the unsize coercion.
429                let handler_for_vm: Box<dyn EffectHandler + 'a> = handler;
430                let mut vm = Vm::with_handler(program, handler_for_vm);
431                for (idx, item) in bucket {
432                    let r = vm
433                        .invoke_closure_value(closure.clone(), vec![item])
434                        .map_err(|e| format!("{e:?}"));
435                    results.lock().unwrap()[idx] = Some(r);
436                }
437            }));
438        }
439        for h in handles {
440            h.join().map_err(|_| ()).ok();
441        }
442    });
443
444    let mut out = Vec::with_capacity(n_total);
445    let inner = results.into_inner().unwrap();
446    for r in inner {
447        match r {
448            Some(Ok(v)) => out.push(v),
449            Some(Err(e)) => return Err(VmError::Effect(format!("par_map worker: {e}"))),
450            None => return Err(VmError::Panic("par_map worker did not produce a result".into())),
451        }
452    }
453    Ok(out)
454}
455
456impl<'a> Vm<'a> {
457    pub fn new(program: &'a Program) -> Self {
458        Self::with_handler(program, Box::new(DenyAllEffects))
459    }
460
461    pub fn with_handler(program: &'a Program, handler: Box<dyn EffectHandler + 'a>) -> Self {
462        Self {
463            program,
464            handler,
465            tracer: Box::new(NullTracer),
466            // Pre-allocate enough capacity for a typical request so the first
467            // call incurs no reallocation (#389 slice 3).
468            frames: Vec::with_capacity(32),
469            stack: Vec::with_capacity(128),
470            step_limit: 10_000_000,
471            steps: 0,
472            pure_memo: std::collections::HashMap::new(),
473            pure_memo_hits: 0,
474            pure_memo_misses: 0,
475            field_ics: std::collections::HashMap::new(),
476            // 256 slots handles ~32 frames × 8 locals; grows on demand and
477            // retains capacity across consecutive vm.call() invocations.
478            locals_storage: Vec::with_capacity(256),
479        }
480    }
481
482    pub fn set_tracer(&mut self, tracer: Box<dyn Tracer + 'a>) {
483        self.tracer = tracer;
484    }
485
486    /// Cap the number of opcode dispatches before the VM aborts with
487    /// `step limit exceeded`. Useful as a runtime DoS guard against
488    /// untrusted code (e.g. the `agent-tool` sandbox, where an LLM
489    /// could emit `list.fold(list.range(0, 1_000_000_000), …)` to hang
490    /// the host). Default is 10_000_000.
491    pub fn set_step_limit(&mut self, limit: u64) {
492        self.step_limit = limit;
493    }
494
495    pub fn call(&mut self, name: &str, args: Vec<Value>) -> Result<Value, VmError> {
496        let fn_id = self.program.lookup(name).ok_or_else(|| VmError::Panic(format!("no function `{name}`")))?;
497        self.invoke(fn_id, args)
498    }
499
500    /// Vm-level handler for `parser.run` (#221). Routed here from
501    /// `Op::EffectCall` rather than through the `EffectHandler` so
502    /// the recursive parser interpreter has reentrant Vm access for
503    /// closure invocation. Returns the wrapped `Result[T, ParseErr]`
504    /// value the language sees.
505    fn run_parser_op(&mut self, args: Vec<Value>) -> Result<Value, String> {
506        let parser = args.first().cloned()
507            .ok_or_else(|| "parser.run: missing parser arg".to_string())?;
508        let input = match args.get(1) {
509            Some(Value::Str(s)) => s.clone(),
510            _ => return Err("parser.run: input must be Str".into()),
511        };
512        match crate::parser_runtime::run_parser(&parser, &input, 0, self) {
513            Ok((value, _pos)) => Ok(Value::Variant {
514                name: "Ok".into(),
515                args: vec![value],
516            }),
517            Err((pos, msg)) => {
518                let mut e = indexmap::IndexMap::new();
519                e.insert("pos".into(), Value::Int(pos as i64));
520                e.insert("message".into(), Value::Str(msg.into()));
521                Ok(Value::Variant {
522                    name: "Err".into(),
523                    args: vec![Value::Record(e)],
524                })
525            }
526        }
527    }
528
529    // ---- Variant helpers used by conc.* registry ops (#444) ----
530    // Local helpers (avoid pulling in serde / public API). Lex's
531    // `Result`/`Option` are stdlib unions; their runtime shape is a
532    // `Value::Variant { name, args }` with the constructor name as
533    // declared (`Ok`/`Err`/`Some`/`None`).
534
535    /// VM-level handler for `conc.*` effect ops (#381).
536    ///
537    /// * `conc.spawn(init, handler)` — creates an `Actor` wrapping the
538    ///   initial state and the handler closure. No background thread is
539    ///   started; the actor runs synchronously on the calling thread
540    ///   under a `Mutex` so concurrent callers serialise.
541    ///
542    /// * `conc.ask(actor, msg)` — locks the actor, calls
543    ///   `handler(state, msg)` on *this* VM (reentrant), expects a
544    ///   2-tuple `(new_state, reply)`, updates the actor's state, and
545    ///   returns `reply`.
546    ///
547    /// * `conc.tell(actor, msg)` — same as `ask` but discards the
548    ///   reply and returns `Unit`.
549    fn run_conc_op(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
550        match op {
551            "spawn" => {
552                let mut it = args.into_iter();
553                let init = it.next().unwrap_or(Value::Unit);
554                let handler = it.next().unwrap_or(Value::Unit);
555                if !matches!(handler, Value::Closure { .. }) {
556                    return Err(format!(
557                        "conc.spawn: handler must be a Closure, got {handler:?}"));
558                }
559                Ok(Value::Actor(Arc::new(Mutex::new(ActorCell {
560                    state: init,
561                    handler: crate::value::ActorHandler::Lex(handler),
562                }))))
563            }
564            "ask" | "tell" => {
565                let mut it = args.into_iter();
566                let actor_val = it.next().unwrap_or(Value::Unit);
567                let msg = it.next().unwrap_or(Value::Unit);
568                let cell = match actor_val {
569                    Value::Actor(ref arc) => Arc::clone(arc),
570                    other => return Err(format!(
571                        "conc.{op}: first arg must be an Actor, got {other:?}")),
572                };
573                // Lock the actor: guarantees at-most-one-concurrent message.
574                let mut guard = cell.lock().map_err(|e| format!("conc.{op}: actor mutex poisoned: {e}"))?;
575                let handler = guard.handler.clone();
576                let state = guard.state.clone();
577                match handler {
578                    crate::value::ActorHandler::Lex(closure_val) => {
579                        // Call handler(state, msg) on this VM — full effect access.
580                        let result = self.invoke_closure_value(closure_val, vec![state, msg])
581                            .map_err(|e| format!("conc.{op}: handler error: {e:?}"))?;
582                        // Expect (new_state, reply) tuple.
583                        match result {
584                            Value::Tuple(mut parts) if parts.len() == 2 => {
585                                let reply = parts.pop().unwrap();
586                                let new_state = parts.pop().unwrap();
587                                guard.state = new_state;
588                                drop(guard);
589                                if op == "ask" { Ok(reply) } else { Ok(Value::Unit) }
590                            }
591                            other => Err(format!(
592                                "conc.{op}: handler must return a 2-tuple (new_state, reply), got {other:?}")),
593                        }
594                    }
595                    crate::value::ActorHandler::Native(native) => {
596                        // Native bridge: fire-and-forget; `state` is unused
597                        // (the bridge's "state" is the external resource, e.g.
598                        // a WebSocket connection). The closure receives `msg`
599                        // directly. `ask` returns whatever the bridge produces;
600                        // `tell` discards it. State stays untouched.
601                        drop(guard);
602                        let result = (native.send)(msg)
603                            .map_err(|e| format!("conc.{op}: native handler error: {e}"))?;
604                        if op == "ask" { Ok(result) } else { Ok(Value::Unit) }
605                    }
606                }
607            }
608            "register" => {
609                // conc.register(actor, name) -> Result[Unit, ConcError]
610                // Returns Ok(Unit) on first register, Err(AlreadyRegistered(name))
611                // if the name is taken. v1 stores the actor opaquely —
612                // see crate::conc_registry for the type-tag note.
613                let mut it = args.into_iter();
614                let actor = it.next().unwrap_or(Value::Unit);
615                if !matches!(actor, Value::Actor(_)) {
616                    return Err(format!(
617                        "conc.register: first arg must be an Actor, got {actor:?}"));
618                }
619                let name = match it.next() {
620                    Some(Value::Str(s)) => s.to_string(),
621                    other => return Err(format!(
622                        "conc.register: name must be Str, got {other:?}")),
623                };
624                Ok(match crate::conc_registry::register(&name, actor) {
625                    Ok(()) => variant_ok(Value::Unit),
626                    Err(crate::conc_registry::RegError::AlreadyRegistered(n)) => {
627                        variant_err(variant("AlreadyRegistered", vec![Value::Str(n.into())]))
628                    }
629                    Err(crate::conc_registry::RegError::NotRegistered(_)) => {
630                        unreachable!("register cannot produce NotRegistered")
631                    }
632                })
633            }
634            "lookup" => {
635                // conc.lookup(name) -> Option[Actor[S, M]]
636                // Returns Some(actor) if registered, None otherwise. The
637                // [S, M] static parametrisation at the call site is not
638                // checked at runtime in v1 — caller's responsibility to
639                // match the registration site's type.
640                let mut it = args.into_iter();
641                let name = match it.next() {
642                    Some(Value::Str(s)) => s.to_string(),
643                    other => return Err(format!(
644                        "conc.lookup: name must be Str, got {other:?}")),
645                };
646                Ok(match crate::conc_registry::lookup(&name) {
647                    Some(actor) => variant("Some", vec![actor]),
648                    None => variant("None", vec![]),
649                })
650            }
651            "unregister" => {
652                // conc.unregister(name) -> Result[Unit, ConcError]
653                let mut it = args.into_iter();
654                let name = match it.next() {
655                    Some(Value::Str(s)) => s.to_string(),
656                    other => return Err(format!(
657                        "conc.unregister: name must be Str, got {other:?}")),
658                };
659                Ok(match crate::conc_registry::unregister(&name) {
660                    Ok(()) => variant_ok(Value::Unit),
661                    Err(crate::conc_registry::RegError::NotRegistered(n)) => {
662                        variant_err(variant("NotRegistered", vec![Value::Str(n.into())]))
663                    }
664                    Err(crate::conc_registry::RegError::AlreadyRegistered(_)) => {
665                        unreachable!("unregister cannot produce AlreadyRegistered")
666                    }
667                })
668            }
669            "registered" => {
670                // conc.registered() -> List[Str] — sorted snapshot.
671                let names = crate::conc_registry::registered();
672                Ok(Value::List(names.into_iter()
673                    .map(|n| Value::Str(n.into()))
674                    .collect()))
675            }
676            other => Err(format!("unknown conc.{other}")),
677        }
678    }
679
680    /// Invoke a `Value::Closure` by combining its captures with the
681    /// supplied call args and dispatching to the underlying function.
682    /// Used by the parser interpreter (#221) to call user-supplied
683    /// `f` arguments inside `parser.map` / `parser.and_then` nodes.
684    pub fn invoke_closure_value(
685        &mut self,
686        closure: Value,
687        args: Vec<Value>,
688    ) -> Result<Value, VmError> {
689        let (fn_id, captures) = match closure {
690            Value::Closure { fn_id, captures, .. } => (fn_id, captures),
691            other => return Err(VmError::TypeMismatch(
692                format!("invoke_closure_value: not a closure: {other:?}"))),
693        };
694        let mut combined = captures;
695        combined.extend(args);
696        self.invoke(fn_id, combined)
697    }
698
699    pub fn invoke(&mut self, fn_id: u32, args: Vec<Value>) -> Result<Value, VmError> {
700        let f = &self.program.functions[fn_id as usize];
701        if args.len() != f.arity as usize {
702            return Err(VmError::Panic(format!("arity mismatch calling {}", f.name)));
703        }
704        // Refinement runtime check at the public entry point too
705        // (#209 slice 3). `Op::Call` checks for in-program calls;
706        // this branch covers `vm.call("entry", ...)` from the host
707        // and the reentrant `invoke_closure_value` path. Same
708        // semantics, same error shape.
709        let f_name = f.name.clone();
710        // Iterate `f.refinements` by reference — the loop body
711        // only reads from `self.program` (via `r`) and from locals,
712        // so we don't need to clone the Vec to detach it from
713        // `&self`. Removing this clone saves an allocation per
714        // call on the hot path (#461).
715        for (i, refinement) in f.refinements.iter().enumerate() {
716            if let Some(r) = refinement {
717                let arg = args.get(i).cloned().unwrap_or(Value::Unit);
718                match eval_refinement(&r.predicate, &r.binding, &arg) {
719                    Ok(true) => {}
720                    Ok(false) => return Err(VmError::RefinementFailed {
721                        fn_name: f_name,
722                        param_index: i,
723                        binding: r.binding.clone(),
724                        reason: format!("predicate failed for {} = {arg:?}", r.binding),
725                    }),
726                    Err(reason) => return Err(VmError::RefinementFailed {
727                        fn_name: f_name,
728                        param_index: i,
729                        binding: r.binding.clone(),
730                        reason,
731                    }),
732                }
733            }
734        }
735        let f = &self.program.functions[fn_id as usize];
736        // Claim slots from the locals stack allocator (#389 slice 3).
737        let locals_start = self.locals_storage.len();
738        let locals_len = f.locals_count.max(f.arity) as usize;
739        self.locals_storage.resize(locals_start + locals_len, Value::Unit);
740        for (i, v) in args.into_iter().enumerate() {
741            self.locals_storage[locals_start + i] = v;
742        }
743        // Record the depth before pushing — this is what `run` will
744        // exit at, supporting reentrant invocation from inside the
745        // VM (e.g. the parser interpreter calling closures, #221).
746        let base_depth = self.frames.len();
747        self.push_frame(Frame {
748            fn_id, pc: 0, locals_start, locals_len,
749            stack_base: self.stack.len(),
750            trace_kind: FrameKind::Entry,
751            memo_key: None,
752        })?;
753        self.run_to(base_depth)
754    }
755
756    /// All call-frame pushes funnel through here so the depth
757    /// check can't be skipped by a missing branch. Returns
758    /// `CallStackOverflow` instead of letting recursion blow the
759    /// host's native stack.
760    fn push_frame(&mut self, frame: Frame) -> Result<(), VmError> {
761        if self.frames.len() as u32 >= MAX_CALL_DEPTH {
762            return Err(VmError::CallStackOverflow(MAX_CALL_DEPTH));
763        }
764        self.frames.push(frame);
765        Ok(())
766    }
767
768    /// Run until the frame stack drops to `base_depth`. Required for
769    /// reentrant invocation: a `Vm::invoke` call from inside an
770    /// already-running `run()` must return when *its* frame returns,
771    /// not when the entire frame stack empties (#221).
772    fn run_to(&mut self, base_depth: usize) -> Result<Value, VmError> {
773        loop {
774            if self.steps > self.step_limit {
775                let frame_idx = self.frames.len() - 1;
776                let fn_id = self.frames[frame_idx].fn_id;
777                let fn_name = &self.program.functions[fn_id as usize].name;
778                return Err(VmError::Panic(format!(
779                    "step limit exceeded in `{fn_name}` ({} > {})",
780                    self.steps, self.step_limit,
781                )));
782            }
783            self.steps += 1;
784            let frame_idx = self.frames.len() - 1;
785            let pc = self.frames[frame_idx].pc;
786            let fn_id = self.frames[frame_idx].fn_id;
787            let code = &self.program.functions[fn_id as usize].code;
788            if pc >= code.len() {
789                let fn_name = &self.program.functions[fn_id as usize].name;
790                return Err(VmError::Panic(format!("ran past end of code in `{fn_name}`")));
791            }
792            let op = code[pc];
793            self.frames[frame_idx].pc = pc + 1;
794
795            match op {
796                Op::PushConst(i) => {
797                    let c = &self.program.constants[i as usize];
798                    self.stack.push(const_to_value(c));
799                }
800                Op::Pop => { self.pop()?; }
801                Op::Dup => {
802                    let v = self.peek()?.clone();
803                    self.stack.push(v);
804                }
805                Op::LoadLocal(i) => {
806                    let base = self.frames[frame_idx].locals_start;
807                    let v = self.locals_storage[base + i as usize].clone();
808                    self.stack.push(v);
809                }
810                Op::StoreLocal(i) => {
811                    let v = self.pop()?;
812                    let base = self.frames[frame_idx].locals_start;
813                    self.locals_storage[base + i as usize] = v;
814                }
815                Op::MakeRecord { shape_idx, field_count } => {
816                    let shape = &self.program.record_shapes[shape_idx as usize];
817                    let n = field_count as usize;
818                    debug_assert_eq!(shape.len(), n,
819                        "MakeRecord field_count must match record_shapes[shape_idx].len()");
820                    let mut values: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
821                    for i in (0..n).rev() {
822                        values[i] = self.pop()?;
823                    }
824                    let mut rec = IndexMap::new();
825                    for (i, val) in values.into_iter().enumerate() {
826                        let name = match &self.program.constants[shape[i] as usize] {
827                            Const::FieldName(s) => s.clone(),
828                            _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
829                        };
830                        rec.insert(name, val);
831                    }
832                    self.stack.push(Value::Record(rec));
833                }
834                Op::MakeTuple(n) => {
835                    let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
836                    for i in (0..n as usize).rev() { items[i] = self.pop()?; }
837                    self.stack.push(Value::Tuple(items));
838                }
839                Op::MakeList(n) => {
840                    let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
841                    for i in (0..n as usize).rev() { items[i] = self.pop()?; }
842                    self.stack.push(Value::List(items.into()));
843                }
844                Op::MakeVariant { name_idx, arity } => {
845                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
846                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
847                    let name = match &self.program.constants[name_idx as usize] {
848                        Const::VariantName(s) => s.clone(),
849                        _ => return Err(VmError::TypeMismatch("expected VariantName const".into())),
850                    };
851                    self.stack.push(Value::Variant { name, args });
852                }
853                Op::GetField(i) => {
854                    let name_idx = i;
855                    let v = self.pop()?;
856                    match v {
857                        Value::Record(r) => {
858                            // Inline cache keyed by call site: (fn_id << 32 | pc).
859                            // On hit: verify field name at cached offset (O(1), no
860                            // string hash); on miss: walk by name + populate cache.
861                            let site = (fn_id as u64) << 32 | pc as u64;
862                            let value = 'ic: {
863                                if let Some(&off) = self.field_ics.get(&site) {
864                                    if let Some((k, val)) = r.get_index(off) {
865                                        if let Const::FieldName(s) =
866                                            &self.program.constants[name_idx as usize]
867                                        {
868                                            if s == k { break 'ic val.clone(); } // cache hit
869                                        }
870                                    }
871                                }
872                                // Cache miss: resolve by name, populate IC.
873                                let name = match &self.program.constants[name_idx as usize] {
874                                    Const::FieldName(s) => s.as_str(),
875                                    _ => return Err(VmError::TypeMismatch(
876                                        "expected FieldName const".into())),
877                                };
878                                let (off, _, val) = r.get_full(name)
879                                    .ok_or_else(|| VmError::TypeMismatch(
880                                        format!("missing field `{name}`")))?;
881                                self.field_ics.insert(site, off);
882                                val.clone()
883                            };
884                            self.stack.push(value);
885                        }
886                        other => return Err(VmError::TypeMismatch(
887                            format!("GetField on non-record: {other:?}"))),
888                    }
889                }
890                Op::GetElem(i) => {
891                    let v = self.pop()?;
892                    match v {
893                        Value::Tuple(items) => {
894                            let v = items.into_iter().nth(i as usize)
895                                .ok_or_else(|| VmError::TypeMismatch(format!("tuple index {i} out of range")))?;
896                            self.stack.push(v);
897                        }
898                        other => return Err(VmError::TypeMismatch(format!("GetElem on non-tuple: {other:?}"))),
899                    }
900                }
901                Op::TestVariant(i) => {
902                    let name = match &self.program.constants[i as usize] {
903                        Const::VariantName(s) => s.clone(),
904                        _ => return Err(VmError::TypeMismatch("expected VariantName const".into())),
905                    };
906                    let v = self.pop()?;
907                    match &v {
908                        Value::Variant { name: vname, .. } => {
909                            self.stack.push(Value::Bool(vname == &name));
910                        }
911                        // For tag-only enums of primitive type (e.g. ParseError = Empty | NotNumber)
912                        // the value is currently a Variant too, since constructors emit MakeVariant.
913                        other => return Err(VmError::TypeMismatch(format!("TestVariant on non-variant: {other:?}"))),
914                    }
915                }
916                Op::GetVariant(_i) => {
917                    let v = self.pop()?;
918                    match v {
919                        Value::Variant { args, .. } => {
920                            self.stack.push(Value::Tuple(args));
921                        }
922                        other => return Err(VmError::TypeMismatch(format!("GetVariant on non-variant: {other:?}"))),
923                    }
924                }
925                Op::GetVariantArg(i) => {
926                    let v = self.pop()?;
927                    match v {
928                        Value::Variant { mut args, .. } => {
929                            if (i as usize) >= args.len() {
930                                return Err(VmError::TypeMismatch("variant arg index oob".into()));
931                            }
932                            self.stack.push(args.swap_remove(i as usize));
933                        }
934                        other => return Err(VmError::TypeMismatch(format!("GetVariantArg on non-variant: {other:?}"))),
935                    }
936                }
937                Op::GetListLen => {
938                    let v = self.pop()?;
939                    match v {
940                        Value::List(items) => self.stack.push(Value::Int(items.len() as i64)),
941                        other => return Err(VmError::TypeMismatch(format!("GetListLen on non-list: {other:?}"))),
942                    }
943                }
944                Op::GetListElem(i) => {
945                    let v = self.pop()?;
946                    match v {
947                        Value::List(items) => {
948                            let v = items.into_iter().nth(i as usize)
949                                .ok_or_else(|| VmError::TypeMismatch("list index oob".into()))?;
950                            self.stack.push(v);
951                        }
952                        other => return Err(VmError::TypeMismatch(format!("GetListElem on non-list: {other:?}"))),
953                    }
954                }
955                Op::GetListElemDyn => {
956                    // Stack: [list, idx]
957                    let idx = match self.pop()? {
958                        Value::Int(n) => n as usize,
959                        other => return Err(VmError::TypeMismatch(format!("GetListElemDyn idx: {other:?}"))),
960                    };
961                    let v = self.pop()?;
962                    match v {
963                        Value::List(items) => {
964                            let v = items.into_iter().nth(idx)
965                                .ok_or_else(|| VmError::TypeMismatch("list index oob".into()))?;
966                            self.stack.push(v);
967                        }
968                        other => return Err(VmError::TypeMismatch(format!("GetListElemDyn on non-list: {other:?}"))),
969                    }
970                }
971                Op::ListAppend => {
972                    let value = self.pop()?;
973                    let list = self.pop()?;
974                    match list {
975                        Value::List(mut items) => {
976                            items.push_back(value);
977                            self.stack.push(Value::List(items));
978                        }
979                        other => return Err(VmError::TypeMismatch(format!("ListAppend on non-list: {other:?}"))),
980                    }
981                }
982                Op::Jump(off) => {
983                    let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
984                    self.frames[frame_idx].pc = new_pc;
985                }
986                Op::JumpIf(off) => {
987                    let v = self.pop()?;
988                    if v.as_bool() {
989                        let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
990                        self.frames[frame_idx].pc = new_pc;
991                    }
992                }
993                Op::JumpIfNot(off) => {
994                    let v = self.pop()?;
995                    if !v.as_bool() {
996                        let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
997                        self.frames[frame_idx].pc = new_pc;
998                    }
999                }
1000                Op::MakeClosure { fn_id, capture_count } => {
1001                    let n = capture_count as usize;
1002                    let mut captures: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1003                    for i in (0..n).rev() { captures[i] = self.pop()?; }
1004                    // Look up the canonical body hash so the resulting
1005                    // `Value::Closure` carries it for equality (#222).
1006                    let body_hash = self.program.functions[fn_id as usize].body_hash;
1007                    self.stack.push(Value::Closure { fn_id, body_hash, captures });
1008                }
1009                Op::CallClosure { arity, node_id_idx } => {
1010                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
1011                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
1012                    let closure = self.pop()?;
1013                    let (fn_id, captures) = match closure {
1014                        Value::Closure { fn_id, captures, .. } => (fn_id, captures),
1015                        other => return Err(VmError::TypeMismatch(format!("CallClosure on non-closure: {other:?}"))),
1016                    };
1017                    let node_id = const_str(&self.program.constants, node_id_idx);
1018                    let callee_name = self.program.functions[fn_id as usize].name.clone();
1019                    let budget_cost = call_budget_cost(&self.program.functions[fn_id as usize]);
1020                    if budget_cost > 0 {
1021                        self.handler.note_call_budget(budget_cost)
1022                            .map_err(VmError::Effect)?;
1023                    }
1024                    let mut combined = captures;
1025                    combined.extend(args);
1026                    self.tracer.enter_call(&node_id, &callee_name, &combined);
1027                    let f = &self.program.functions[fn_id as usize];
1028                    let locals_start = self.locals_storage.len();
1029                    let locals_len = f.locals_count.max(f.arity) as usize;
1030                    self.locals_storage.resize(locals_start + locals_len, Value::Unit);
1031                    for (i, v) in combined.into_iter().enumerate() {
1032                        self.locals_storage[locals_start + i] = v;
1033                    }
1034                    self.push_frame(Frame {
1035                        fn_id, pc: 0, locals_start, locals_len,
1036                        stack_base: self.stack.len(),
1037                        trace_kind: FrameKind::Call(node_id),
1038                        // Op::CallClosure intentionally doesn't memoize
1039                        // for v1 (#229) — closures over captures need a
1040                        // hashing strategy that includes the captures.
1041                        // Direct Op::Call is the v1 surface.
1042                        memo_key: None,
1043                    })?;
1044                }
1045                Op::SortByKey { node_id_idx: _ } => {
1046                    // #338: pop (xs, f). For each x in xs, invoke
1047                    // f(x) to derive a sortable key. Stable-sort the
1048                    // (key, value) pairs by key. Return the values
1049                    // in sorted order. Keys must be Int / Float /
1050                    // Str; mixed-type pairs and other types compare
1051                    // as equal (preserving original order — stable
1052                    // sort).
1053                    let f = self.pop()?;
1054                    let xs = self.pop()?;
1055                    let items = match xs {
1056                        Value::List(v) => v,
1057                        other => return Err(VmError::TypeMismatch(
1058                            format!("SortByKey requires a List, got: {other:?}"))),
1059                    };
1060                    if !matches!(f, Value::Closure { .. }) {
1061                        return Err(VmError::TypeMismatch(
1062                            format!("SortByKey requires a closure, got: {f:?}")));
1063                    }
1064                    let mut keyed: Vec<(Value, Value)> = Vec::with_capacity(items.len());
1065                    for item in items {
1066                        let key = self.invoke_closure_value(f.clone(), vec![item.clone()])?;
1067                        keyed.push((key, item));
1068                    }
1069                    keyed.sort_by(|(ka, _), (kb, _)| compare_sort_keys(ka, kb));
1070                    let sorted: VecDeque<Value> = keyed.into_iter().map(|(_, v)| v).collect();
1071                    self.stack.push(Value::List(sorted));
1072                }
1073                Op::ParallelMap { node_id_idx: _ } => {
1074                    // #305 slice 1: pop (xs, f) and apply f to each
1075                    // element across OS threads.
1076                    //
1077                    // #305 slice 2: each worker now asks the parent
1078                    // handler for a thread-safe per-worker handler via
1079                    // `EffectHandler::spawn_for_worker`. Handlers that
1080                    // opt in (e.g. `DefaultHandler`) yield a fresh
1081                    // instance sharing the budget pool; handlers that
1082                    // don't fall back to the slice-1 behavior of
1083                    // `DenyAllEffects` in the worker.
1084                    let f = self.pop()?;
1085                    let xs = self.pop()?;
1086                    let items = match xs {
1087                        Value::List(v) => v,
1088                        other => return Err(VmError::TypeMismatch(
1089                            format!("ParallelMap requires a List, got: {other:?}"))),
1090                    };
1091                    if !matches!(f, Value::Closure { .. }) {
1092                        return Err(VmError::TypeMismatch(
1093                            format!("ParallelMap requires a closure, got: {f:?}")));
1094                    }
1095                    // Pre-build one handler per worker on the main
1096                    // thread so the worker just owns its handler with
1097                    // no shared borrowing. The actual worker count is
1098                    // capped by `LEX_PAR_MAX_CONCURRENCY` (resolved
1099                    // inside par_map_run); cap ≤ items.len() so we
1100                    // never over-allocate handlers.
1101                    let n_workers = par_max_concurrency().max(1).min(items.len().max(1));
1102                    let mut worker_handlers: Vec<Box<dyn EffectHandler + Send>> =
1103                        Vec::with_capacity(n_workers);
1104                    for _ in 0..n_workers {
1105                        worker_handlers.push(
1106                            self.handler
1107                                .spawn_for_worker()
1108                                .unwrap_or_else(|| Box::new(DenyAllEffects)),
1109                        );
1110                    }
1111                    let results = par_map_run(self.program, f, items.into_iter().collect(), worker_handlers)?;
1112                    self.stack.push(Value::List(results.into()));
1113                }
1114                Op::Call { fn_id, arity, node_id_idx } => {
1115                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
1116                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
1117                    let node_id = const_str(&self.program.constants, node_id_idx);
1118                    let callee_name = self.program.functions[fn_id as usize].name.clone();
1119                    let budget_cost = call_budget_cost(&self.program.functions[fn_id as usize]);
1120                    if budget_cost > 0 {
1121                        self.handler.note_call_budget(budget_cost)
1122                            .map_err(VmError::Effect)?;
1123                    }
1124                    // Refinement runtime check (#209 slice 3). Each
1125                    // param's `Option<Refinement>` is evaluated against
1126                    // the actual arg before the frame is pushed. The
1127                    // tracer sees the call enter; failure surfaces as
1128                    // `VmError::RefinementFailed` *before* the body
1129                    // starts, which means an erroring trace shows the
1130                    // call as enter+exit_err with the verdict reason
1131                    // (same shape as `gate.verdict`).
1132                    //
1133                    // Iterate by reference — the loop body reads only
1134                    // through `r` (borrowed from `self.program`) and
1135                    // locals; we don't mutate `self` in here, so the
1136                    // borrow is fine and we avoid one Vec allocation
1137                    // per call on the hot path (#461).
1138                    let refinements = &self.program.functions[fn_id as usize].refinements;
1139                    for (i, refinement) in refinements.iter().enumerate() {
1140                        if let Some(r) = refinement {
1141                            let arg = args.get(i).cloned().unwrap_or(Value::Unit);
1142                            match eval_refinement(&r.predicate, &r.binding, &arg) {
1143                                Ok(true) => { /* satisfied, continue */ }
1144                                Ok(false) => {
1145                                    return Err(VmError::RefinementFailed {
1146                                        fn_name: callee_name.clone(),
1147                                        param_index: i,
1148                                        binding: r.binding.clone(),
1149                                        reason: format!(
1150                                            "predicate failed for {} = {arg:?}",
1151                                            r.binding),
1152                                    });
1153                                }
1154                                Err(reason) => {
1155                                    return Err(VmError::RefinementFailed {
1156                                        fn_name: callee_name.clone(),
1157                                        param_index: i,
1158                                        binding: r.binding.clone(),
1159                                        reason,
1160                                    });
1161                                }
1162                            }
1163                        }
1164                    }
1165                    // Pure-fn memoization (#229): if the callee declares
1166                    // no effects, hash the args and consult the cache.
1167                    // On hit, push the cached value, emit synthetic
1168                    // enter+exit trace events (so the trace still shows
1169                    // the call), and skip the frame push entirely.
1170                    let f = &self.program.functions[fn_id as usize];
1171                    let memo_key: Option<(u32, [u8; 16])> = if f.effects.is_empty() {
1172                        Some((fn_id, hash_call_args(&args)))
1173                    } else {
1174                        None
1175                    };
1176                    if let Some(key) = memo_key {
1177                        if let Some(cached) = self.pure_memo.get(&key).cloned() {
1178                            self.pure_memo_hits += 1;
1179                            self.tracer.enter_call(&node_id, &callee_name, &args);
1180                            self.tracer.exit_ok(&cached);
1181                            self.stack.push(cached);
1182                            continue;
1183                        }
1184                        self.pure_memo_misses += 1;
1185                    }
1186                    self.tracer.enter_call(&node_id, &callee_name, &args);
1187                    let locals_start = self.locals_storage.len();
1188                    let locals_len = f.locals_count.max(f.arity) as usize;
1189                    self.locals_storage.resize(locals_start + locals_len, Value::Unit);
1190                    for (i, v) in args.into_iter().enumerate() {
1191                        self.locals_storage[locals_start + i] = v;
1192                    }
1193                    self.push_frame(Frame {
1194                        fn_id, pc: 0, locals_start, locals_len,
1195                        stack_base: self.stack.len(),
1196                        trace_kind: FrameKind::Call(node_id),
1197                        memo_key,
1198                    })?;
1199                }
1200                Op::TailCall { fn_id, arity, node_id_idx } => {
1201                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
1202                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
1203                    let node_id = const_str(&self.program.constants, node_id_idx);
1204                    let callee_name = self.program.functions[fn_id as usize].name.clone();
1205                    let budget_cost = call_budget_cost(&self.program.functions[fn_id as usize]);
1206                    if budget_cost > 0 {
1207                        self.handler.note_call_budget(budget_cost)
1208                            .map_err(VmError::Effect)?;
1209                    }
1210                    // Refinement runtime check on tail calls too
1211                    // (#209 slice 3). Same shape as Op::Call —
1212                    // iterate by reference to avoid a per-call Vec
1213                    // allocation (#461).
1214                    let refinements = &self.program.functions[fn_id as usize].refinements;
1215                    for (i, refinement) in refinements.iter().enumerate() {
1216                        if let Some(r) = refinement {
1217                            let arg = args.get(i).cloned().unwrap_or(Value::Unit);
1218                            match eval_refinement(&r.predicate, &r.binding, &arg) {
1219                                Ok(true) => {}
1220                                Ok(false) => return Err(VmError::RefinementFailed {
1221                                    fn_name: callee_name.clone(),
1222                                    param_index: i,
1223                                    binding: r.binding.clone(),
1224                                    reason: format!(
1225                                        "predicate failed for {} = {arg:?}",
1226                                        r.binding),
1227                                }),
1228                                Err(reason) => return Err(VmError::RefinementFailed {
1229                                    fn_name: callee_name.clone(),
1230                                    param_index: i,
1231                                    binding: r.binding.clone(),
1232                                    reason,
1233                                }),
1234                            }
1235                        }
1236                    }
1237                    // A tail call closes the current call's trace frame and
1238                    // opens a new one in its place — preserves the caller's
1239                    // tree depth in the trace.
1240                    self.tracer.exit_call_tail();
1241                    self.tracer.enter_call(&node_id, &callee_name, &args);
1242                    let f = &self.program.functions[fn_id as usize];
1243                    // Reuse the current frame's locals_start position:
1244                    // truncate to release old locals then extend for the
1245                    // new function (#389 slice 3, same as Op::Return but
1246                    // without popping the frame).
1247                    let old_locals_start = self.frames.last().unwrap().locals_start;
1248                    self.locals_storage.truncate(old_locals_start);
1249                    let new_locals_len = f.locals_count.max(f.arity) as usize;
1250                    self.locals_storage.resize(old_locals_start + new_locals_len, Value::Unit);
1251                    for (i, v) in args.into_iter().enumerate() {
1252                        self.locals_storage[old_locals_start + i] = v;
1253                    }
1254                    let frame = self.frames.last_mut().unwrap();
1255                    frame.fn_id = fn_id;
1256                    frame.pc = 0;
1257                    frame.locals_len = new_locals_len;
1258                    frame.trace_kind = FrameKind::Call(node_id);
1259                }
1260                Op::EffectCall { kind_idx, op_idx, arity, node_id_idx } => {
1261                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
1262                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
1263                    let kind = match &self.program.constants[kind_idx as usize] {
1264                        Const::Str(s) => s.clone(),
1265                        _ => return Err(VmError::TypeMismatch("expected Str const for effect kind".into())),
1266                    };
1267                    let op_name = match &self.program.constants[op_idx as usize] {
1268                        Const::Str(s) => s.clone(),
1269                        _ => return Err(VmError::TypeMismatch("expected Str const for effect op".into())),
1270                    };
1271                    let node_id = const_str(&self.program.constants, node_id_idx);
1272                    self.tracer.enter_effect(&node_id, &kind, &op_name, &args);
1273                    let result = match self.tracer.override_effect(&node_id) {
1274                        Some(v) => Ok(v),
1275                        // VM-level intercept for `parser.run` (#221).
1276                        // Routed inline rather than through the handler
1277                        // because the parser interpreter needs reentrant
1278                        // VM access to invoke `Value::Closure` values
1279                        // from `Map` / `AndThen` nodes.
1280                        None if (kind.as_str(), op_name.as_str()) == ("parser", "run")
1281                            => self.run_parser_op(args),
1282                        // VM-level intercept for `conc.*` (#381). The actor
1283                        // handler closure must run on the calling VM so it can
1284                        // dispatch arbitrary effects through the same handler
1285                        // chain (e.g. sql queries inside an actor).
1286                        None if kind.as_str() == "conc"
1287                            => self.run_conc_op(op_name.as_str(), args),
1288                        None => self.handler.dispatch(&kind, &op_name, args),
1289                    };
1290                    match result {
1291                        Ok(v) => {
1292                            self.tracer.exit_ok(&v);
1293                            self.stack.push(v);
1294                        }
1295                        Err(e) => {
1296                            self.tracer.exit_err(&e);
1297                            return Err(VmError::Effect(e));
1298                        }
1299                    }
1300                }
1301                Op::Return => {
1302                    let v = self.pop()?;
1303                    let frame = self.frames.pop().unwrap();
1304                    // Trim any extra stuff that the function pushed but didn't pop.
1305                    self.stack.truncate(frame.stack_base);
1306                    // Release this frame's locals back to the arena (#389 slice 3).
1307                    // LIFO frame ordering guarantees this frame's slots are at the top.
1308                    self.locals_storage.truncate(frame.locals_start);
1309                    if matches!(frame.trace_kind, FrameKind::Call(_)) {
1310                        self.tracer.exit_ok(&v);
1311                    }
1312                    // Pure-fn memoization (#229): if this frame was a
1313                    // memoizable call that missed the cache, write the
1314                    // computed return value back so the next call with
1315                    // the same args returns it without re-executing.
1316                    if let Some(key) = frame.memo_key {
1317                        self.pure_memo.insert(key, v.clone());
1318                    }
1319                    // Exit when we've returned past the depth this
1320                    // `run_to` was entered at — supports reentrancy
1321                    // (a nested `invoke` returns into its caller, not
1322                    // out of the outermost VM run, #221).
1323                    if self.frames.len() <= base_depth {
1324                        return Ok(v);
1325                    }
1326                    self.stack.push(v);
1327                }
1328                Op::Panic(i) => {
1329                    let msg = match &self.program.constants[i as usize] {
1330                        Const::Str(s) => s.clone(),
1331                        _ => "panic".into(),
1332                    };
1333                    return Err(VmError::Panic(msg));
1334                }
1335                // Arithmetic
1336                Op::IntAdd => self.bin_int(|a, b| Value::Int(a + b))?,
1337                Op::IntSub => self.bin_int(|a, b| Value::Int(a - b))?,
1338                Op::IntMul => self.bin_int(|a, b| Value::Int(a * b))?,
1339                Op::IntDiv => self.bin_int(|a, b| Value::Int(a / b))?,
1340                Op::IntMod => self.bin_int(|a, b| Value::Int(a % b))?,
1341                Op::IntNeg => {
1342                    let a = self.pop()?.as_int();
1343                    self.stack.push(Value::Int(-a));
1344                }
1345                Op::IntEq => self.bin_int(|a, b| Value::Bool(a == b))?,
1346                Op::IntLt => self.bin_int(|a, b| Value::Bool(a < b))?,
1347                Op::IntLe => self.bin_int(|a, b| Value::Bool(a <= b))?,
1348                Op::FloatAdd => self.bin_float(|a, b| Value::Float(a + b))?,
1349                Op::FloatSub => self.bin_float(|a, b| Value::Float(a - b))?,
1350                Op::FloatMul => self.bin_float(|a, b| Value::Float(a * b))?,
1351                Op::FloatDiv => self.bin_float(|a, b| Value::Float(a / b))?,
1352                Op::FloatNeg => {
1353                    let a = self.pop()?.as_float();
1354                    self.stack.push(Value::Float(-a));
1355                }
1356                Op::FloatEq => self.bin_float(|a, b| Value::Bool(a == b))?,
1357                Op::FloatLt => self.bin_float(|a, b| Value::Bool(a < b))?,
1358                Op::FloatLe => self.bin_float(|a, b| Value::Bool(a <= b))?,
1359                Op::NumAdd => {
1360                    // #308: `+` is overloaded — Str+Str concatenates,
1361                    // numerics add. Other arithmetic ops (-, *, /, %)
1362                    // still reject Str at the type-checker layer.
1363                    let b = self.pop()?;
1364                    let a = self.pop()?;
1365                    match (a, b) {
1366                        (Value::Int(x), Value::Int(y)) => self.stack.push(Value::Int(x + y)),
1367                        (Value::Float(x), Value::Float(y)) => self.stack.push(Value::Float(x + y)),
1368                        (Value::Str(x), Value::Str(y)) => {
1369                            // SmolStr is immutable; concatenate via a temporary String.
1370                            let mut s = String::with_capacity(x.len() + y.len());
1371                            s.push_str(&x);
1372                            s.push_str(&y);
1373                            self.stack.push(Value::Str(s.into()));
1374                        }
1375                        (a, b) => return Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
1376                    }
1377                }
1378                Op::NumSub => self.bin_num(|a, b| Value::Int(a - b), |a, b| Value::Float(a - b))?,
1379                Op::NumMul => self.bin_num(|a, b| Value::Int(a * b), |a, b| Value::Float(a * b))?,
1380                Op::NumDiv => self.bin_num(|a, b| Value::Int(a / b), |a, b| Value::Float(a / b))?,
1381                Op::NumMod => self.bin_int(|a, b| Value::Int(a % b))?,
1382                Op::NumNeg => {
1383                    let v = self.pop()?;
1384                    match v {
1385                        Value::Int(n) => self.stack.push(Value::Int(-n)),
1386                        Value::Float(f) => self.stack.push(Value::Float(-f)),
1387                        other => return Err(VmError::TypeMismatch(format!("NumNeg on {other:?}"))),
1388                    }
1389                }
1390                Op::NumEq => self.bin_eq()?,
1391                Op::NumLt => self.bin_ord(|a, b| Value::Bool(a < b), |a, b| Value::Bool(a < b), |a, b| Value::Bool(a < b))?,
1392                Op::NumLe => self.bin_ord(|a, b| Value::Bool(a <= b), |a, b| Value::Bool(a <= b), |a, b| Value::Bool(a <= b))?,
1393                Op::BoolAnd => {
1394                    let b = self.pop()?.as_bool();
1395                    let a = self.pop()?.as_bool();
1396                    self.stack.push(Value::Bool(a && b));
1397                }
1398                Op::BoolOr => {
1399                    let b = self.pop()?.as_bool();
1400                    let a = self.pop()?.as_bool();
1401                    self.stack.push(Value::Bool(a || b));
1402                }
1403                Op::BoolNot => {
1404                    let a = self.pop()?.as_bool();
1405                    self.stack.push(Value::Bool(!a));
1406                }
1407                Op::StrConcat => {
1408                    let b = self.pop()?;
1409                    let a = self.pop()?;
1410                    let s = format!("{}{}", a.as_str(), b.as_str());
1411                    self.stack.push(Value::Str(s.into()));
1412                }
1413                Op::StrLen => {
1414                    let v = self.pop()?;
1415                    self.stack.push(Value::Int(v.as_str().len() as i64));
1416                }
1417                Op::StrEq => {
1418                    let b = self.pop()?;
1419                    let a = self.pop()?;
1420                    self.stack.push(Value::Bool(a.as_str() == b.as_str()));
1421                }
1422                Op::BytesLen => {
1423                    let v = self.pop()?;
1424                    match v {
1425                        Value::Bytes(b) => self.stack.push(Value::Int(b.len() as i64)),
1426                        other => return Err(VmError::TypeMismatch(format!("BytesLen on {other:?}"))),
1427                    }
1428                }
1429                Op::BytesEq => {
1430                    let b = self.pop()?;
1431                    let a = self.pop()?;
1432                    let eq = match (a, b) {
1433                        (Value::Bytes(x), Value::Bytes(y)) => x == y,
1434                        _ => return Err(VmError::TypeMismatch("BytesEq operands".into())),
1435                    };
1436                    self.stack.push(Value::Bool(eq));
1437                }
1438
1439                // Superinstructions (#461).
1440                Op::LoadLocalAddIntConst { local_idx, imm_const_idx } => {
1441                    let base = self.frames[frame_idx].locals_start;
1442                    let a = self.locals_storage[base + local_idx as usize].as_int();
1443                    let b = match &self.program.constants[imm_const_idx as usize] {
1444                        Const::Int(n) => *n,
1445                        c => return Err(VmError::TypeMismatch(
1446                            format!("LoadLocalAddIntConst expected Int const, got {c:?}"))),
1447                    };
1448                    self.stack.push(Value::Int(a + b));
1449                    // Override the default `pc + 1`: skip past the
1450                    // two inert primitive ops (the original
1451                    // PushConst + IntAdd) that the peephole pass
1452                    // left in place for body-hash stability.
1453                    self.frames[frame_idx].pc = pc + 3;
1454                }
1455            }
1456        }
1457    }
1458
1459    fn pop(&mut self) -> Result<Value, VmError> {
1460        self.stack.pop().ok_or(VmError::StackUnderflow)
1461    }
1462    fn peek(&self) -> Result<&Value, VmError> {
1463        self.stack.last().ok_or(VmError::StackUnderflow)
1464    }
1465
1466    fn bin_int(&mut self, f: impl Fn(i64, i64) -> Value) -> Result<(), VmError> {
1467        let b = self.pop()?.as_int();
1468        let a = self.pop()?.as_int();
1469        self.stack.push(f(a, b));
1470        Ok(())
1471    }
1472    fn bin_float(&mut self, f: impl Fn(f64, f64) -> Value) -> Result<(), VmError> {
1473        let b = self.pop()?.as_float();
1474        let a = self.pop()?.as_float();
1475        self.stack.push(f(a, b));
1476        Ok(())
1477    }
1478    fn bin_num(
1479        &mut self,
1480        i: impl Fn(i64, i64) -> Value,
1481        f: impl Fn(f64, f64) -> Value,
1482    ) -> Result<(), VmError> {
1483        let b = self.pop()?;
1484        let a = self.pop()?;
1485        match (a, b) {
1486            (Value::Int(x), Value::Int(y)) => { self.stack.push(i(x, y)); Ok(()) }
1487            (Value::Float(x), Value::Float(y)) => { self.stack.push(f(x, y)); Ok(()) }
1488            (a, b) => Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
1489        }
1490    }
1491
1492    /// Like `bin_num` but also handles `Str` operands via lexicographic order.
1493    /// Used by `NumLt` / `NumLe` because the type checker admits `Str < Str`
1494    /// and `>` / `>=` compile as swap+NumLt / swap+NumLe (#332).
1495    fn bin_ord(
1496        &mut self,
1497        i: impl Fn(i64, i64) -> Value,
1498        f: impl Fn(f64, f64) -> Value,
1499        s: impl Fn(&str, &str) -> Value,
1500    ) -> Result<(), VmError> {
1501        let b = self.pop()?;
1502        let a = self.pop()?;
1503        match (a, b) {
1504            (Value::Int(x), Value::Int(y)) => { self.stack.push(i(x, y)); Ok(()) }
1505            (Value::Float(x), Value::Float(y)) => { self.stack.push(f(x, y)); Ok(()) }
1506            (Value::Str(x), Value::Str(y)) => { self.stack.push(s(&x, &y)); Ok(()) }
1507            (a, b) => Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
1508        }
1509    }
1510    fn bin_eq(&mut self) -> Result<(), VmError> {
1511        let b = self.pop()?;
1512        let a = self.pop()?;
1513        self.stack.push(Value::Bool(a == b));
1514        Ok(())
1515    }
1516}
1517
1518/// Construct a `Value::Variant` with the given name and args.
1519/// Used by `conc.*` registry ops to return `Result`/`Option`/`ConcError`
1520/// values without hand-writing the struct literal at every site.
1521fn variant(name: &str, args: Vec<Value>) -> Value {
1522    Value::Variant { name: name.to_string(), args }
1523}
1524fn variant_ok(payload: Value) -> Value { variant("Ok", vec![payload]) }
1525fn variant_err(payload: Value) -> Value { variant("Err", vec![payload]) }
1526
1527fn const_to_value(c: &Const) -> Value {
1528    match c {
1529        Const::Int(n) => Value::Int(*n),
1530        Const::Float(f) => Value::Float(*f),
1531        Const::Bool(b) => Value::Bool(*b),
1532        Const::Str(s) => Value::Str(s.as_str().into()),
1533        Const::Bytes(b) => Value::Bytes(b.clone()),
1534        Const::Unit => Value::Unit,
1535        Const::FieldName(s) | Const::VariantName(s) | Const::NodeId(s) => Value::Str(s.as_str().into()),
1536    }
1537}