Skip to main content

lex_bytecode/
vm.rs

1//! M5: bytecode VM. Stack machine with effect dispatch through a host handler.
2
3use crate::op::*;
4use crate::program::*;
5use crate::value::{ActorCell, Value};
6use std::sync::{Arc, Mutex, OnceLock};
7use indexmap::IndexMap;
8use smol_str::SmolStr;
9use std::collections::{HashMap, VecDeque};
10
11// ── IC polymorphism instrumentation (throwaway, env-gated) ─────────
12// Enable with LEX_IC_STATS=1. With LEX_IC_STATS_OUT=<path> writes a
13// TSV to <path>.<pid> on each Vm drop; otherwise dumps to stderr.
14
15#[derive(Default)]
16struct IcStats {
17    sites: HashMap<(u32, u32), HashMap<u32, u64>>,
18}
19
20static IC_STATS: OnceLock<Mutex<IcStats>> = OnceLock::new();
21static IC_STATS_ENABLED: OnceLock<bool> = OnceLock::new();
22
23fn ic_stats_enabled() -> bool {
24    *IC_STATS_ENABLED.get_or_init(|| {
25        std::env::var("LEX_IC_STATS").map(|v| v == "1").unwrap_or(false)
26    })
27}
28
29fn record_ic_hit(fn_id: u32, site_idx: u32, shape_id: u32) {
30    let stats = IC_STATS.get_or_init(|| Mutex::new(IcStats::default()));
31    let mut s = stats.lock().unwrap();
32    *s.sites.entry((fn_id, site_idx)).or_default().entry(shape_id).or_insert(0) += 1;
33}
34
35pub fn dump_ic_stats() {
36    let Some(stats) = IC_STATS.get() else { return; };
37    let s = stats.lock().unwrap();
38    if s.sites.is_empty() { return; }
39    let mut out = String::from("fn_id\tsite_idx\tshape_id\thits\n");
40    let mut entries: Vec<_> = s.sites.iter().collect();
41    entries.sort_by_key(|((f, si), _)| (*f, *si));
42    for ((f, site), shapes) in entries {
43        let mut shape_entries: Vec<_> = shapes.iter().collect();
44        shape_entries.sort_by_key(|(sid, _)| **sid);
45        for (sid, hits) in shape_entries {
46            out.push_str(&format!("{f}\t{site}\t{sid}\t{hits}\n"));
47        }
48    }
49    match std::env::var("LEX_IC_STATS_OUT").ok() {
50        Some(path) => {
51            let pid = std::process::id();
52            let _ = std::fs::write(format!("{path}.{pid}"), out);
53        }
54        None => { eprint!("{out}"); }
55    }
56}
57
58#[derive(Debug, Clone, thiserror::Error)]
59pub enum VmError {
60    #[error("runtime panic: {0}")]
61    Panic(String),
62    #[error("type mismatch at runtime: {0}")]
63    TypeMismatch(String),
64    #[error("stack underflow")]
65    StackUnderflow,
66    #[error("unknown function: {0}")]
67    UnknownFunction(String),
68    #[error("effect handler error: {0}")]
69    Effect(String),
70    #[error("call stack overflow: recursion depth exceeded ({0})")]
71    CallStackOverflow(u32),
72    /// Refinement predicate failed at a call boundary (#209 slice 3).
73    /// Surfaced when a function declares `param :: Type{x | predicate}`,
74    /// the call-site arg couldn't be discharged statically (slice 2),
75    /// and the runtime evaluator finds the predicate is `false` for
76    /// the actual argument value. The `verdict` mirrors the shape of
77    /// `gate.verdict`-style records in `lex-trace`.
78    #[error("refinement violated: argument {param_index} of `{fn_name}` (binding `{binding}`): {reason}")]
79    RefinementFailed {
80        fn_name: String,
81        param_index: usize,
82        binding: String,
83        reason: String,
84    },
85    /// Integer division or modulo with a zero divisor (#696). Without
86    /// this guard the host `/`/`%` panics and takes the whole process
87    /// down — the crash report had a conformance harness compute a
88    /// rate over an empty set in teardown, far from any user-visible
89    /// division. Surfacing a catchable `VmError` instead keeps the
90    /// failure inside the language's error model. Float div/mod is
91    /// exempt: IEEE-754 yields inf/NaN rather than trapping.
92    #[error("integer {op} by zero")]
93    DivByZero {
94        /// `"division"` or `"modulo"` — names the offending operator.
95        op: &'static str,
96    },
97}
98
99/// Maximum simultaneous call frames. Defends against unbounded
100/// recursion in agent-emitted code: a body that calls itself
101/// without a base case would otherwise blow the host's native
102/// stack and crash the process. Real Lex code rarely exceeds
103/// ~30 frames; 1024 is generous headroom while still well under
104/// the OS stack limit at any per-frame size we use.
105pub const MAX_CALL_DEPTH: u32 = 1024;
106
107/// Per-frame stack-record budget (#464 step 2). Counts the number of
108/// `Value` slots a frame may consume from `Vm::stack_record_arena`
109/// before further `Op::AllocStackRecord` requests fall back to the
110/// heap path. 64 slots at the current `size_of::<Value>() = 64B`
111/// gives ~4 KiB per frame, matching the design-doc proposal in
112/// `docs/design/escape-analysis.md`. A handler-shaped function
113/// (one outer record of ≤8 fields, plus a handful of small inner
114/// records) fits well inside this without growing.
115pub const STACK_RECORD_BUDGET_SLOTS: u32 = 64;
116
117/// Adaptive-memoization warmup window (#229 adaptive). A pure
118/// function is given this many cache-probing calls to demonstrate a
119/// hit; if it reaches the window with zero hits, memoization is
120/// disabled for it (its calls stop hashing args). A function that
121/// genuinely benefits — e.g. naive recursive `fib`, where each call
122/// immediately reuses sub-results — accumulates hits well before the
123/// window closes and stays enabled. 64 balances "give real reuse a
124/// chance" against "don't pay the hash forever on always-miss code".
125const MEMO_WARMUP_CALLS: u32 = 64;
126
127/// Per-function adaptive-memoization state (#229 adaptive). `enabled`
128/// starts true; once a function reaches `MEMO_WARMUP_CALLS` cache
129/// probes with `hits == 0`, it flips to false and that function's
130/// calls skip the args hash entirely for the rest of the Vm's life.
131#[derive(Clone, Copy)]
132struct MemoFnState {
133    calls: u32,
134    hits: u32,
135    enabled: bool,
136}
137
138impl Default for MemoFnState {
139    fn default() -> Self {
140        MemoFnState { calls: 0, hits: 0, enabled: true }
141    }
142}
143
144/// Host-side effect dispatch. Implementors decide what `kind`/`op` mean
145/// and how arguments map to side effects.
146pub trait EffectHandler {
147    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String>;
148
149    /// Hook called by the VM at every function call so handlers can
150    /// enforce per-call budget consumption (#225). The argument is
151    /// the sum of `[budget(N)]` declared on the callee's signature;
152    /// the handler returns `Err` to refuse the call (the VM converts
153    /// to `VmError::Effect`). Default impl is a no-op so legacy
154    /// handlers and pure-only runs are unaffected.
155    fn note_call_budget(&mut self, _budget_cost: u64) -> Result<(), String> {
156        Ok(())
157    }
158
159    /// Enter a per-request allocation scope (#463 scaffolding).
160    /// Called by the runtime layer (e.g. `net.serve_fn`'s request
161    /// loop) immediately before invoking the user handler closure
162    /// for one request. Implementations push a fresh arena onto
163    /// their internal stack and return its identifier; the matching
164    /// `exit_request_scope` call drops it.
165    ///
166    /// Default impl is a no-op — handlers without arena support
167    /// return a sentinel scope id which they ignore on exit.
168    /// `DefaultHandler` in `lex-runtime` provides the real
169    /// implementation.
170    ///
171    /// Today the VM does NOT route any `Value` allocations through
172    /// the returned arena — see the scaffolding notes in
173    /// `crates/lex-runtime/src/arena.rs`. The hook exists so the
174    /// follow-on slice that adds Value-rep arena routing has a
175    /// stable trait surface to extend.
176    fn enter_request_scope(&mut self) -> u64 { 0 }
177
178    /// Exit a per-request allocation scope opened by
179    /// `enter_request_scope`. Implementations drop the arena
180    /// associated with `scope_id`. Calling exit with a scope_id
181    /// that wasn't returned by a prior enter is implementation-
182    /// defined behavior — DefaultHandler treats it as a no-op so
183    /// mismatched pairs don't panic.
184    fn exit_request_scope(&mut self, _scope_id: u64) {}
185
186    /// `list.par_map` worker-handler factory (#305 slice 2).
187    ///
188    /// Each parallel worker thread runs its own `Vm` and therefore
189    /// needs its own effect handler. The parent handler may opt in
190    /// to per-worker dispatch by returning `Some(handler)` here;
191    /// returning `None` (the default) keeps slice-1 behavior: the
192    /// worker runs `DenyAllEffects` and any effect call inside the
193    /// closure fails with `VmError::Effect`.
194    ///
195    /// The returned handler must be `Send` so the worker can take
196    /// ownership across a thread boundary. Shared state (budget
197    /// pool, chat registry, etc.) is wired up by the implementer.
198    /// Per-worker independence (MCP client cache, output sink)
199    /// is intentional — the alternative is mutex-serialization of
200    /// the whole effect dispatch, which would defeat the parallelism.
201    fn spawn_for_worker(&self) -> Option<Box<dyn EffectHandler + Send>> {
202        None
203    }
204}
205
206/// `Vm` exposes itself as a `ClosureCaller` so the parser interpreter
207/// can invoke user-supplied closures during a `parser.run` walk
208/// (#221). The Vm is reentrant for closure invocation: pushing a new
209/// frame onto an active call stack is supported, and the handler
210/// stays in place so any effects the closure body fires dispatch
211/// normally.
212impl<'a> crate::parser_runtime::ClosureCaller for Vm<'a> {
213    fn call_closure(&mut self, closure: Value, args: Vec<Value>) -> Result<Value, String> {
214        self.invoke_closure_value(closure, args)
215            .map_err(|e| format!("{e:?}"))
216    }
217}
218
219/// A handler that fails any effect call. Useful as a default for pure-only runs.
220pub struct DenyAllEffects;
221impl EffectHandler for DenyAllEffects {
222    fn dispatch(&mut self, kind: &str, op: &str, _args: Vec<Value>) -> Result<Value, String> {
223        Err(format!("effects not permitted (attempted {kind}.{op})"))
224    }
225}
226
227/// Trace receiver. Implementors record the call/effect tree and may
228/// substitute effect responses (for replay).
229pub trait Tracer {
230    fn enter_call(&mut self, node_id: &str, name: &str, args: &[Value]);
231    fn enter_effect(&mut self, node_id: &str, kind: &str, op: &str, args: &[Value]);
232    fn exit_ok(&mut self, value: &Value);
233    fn exit_err(&mut self, message: &str);
234    /// Tail-call optimization: pop the current frame's open call without
235    /// re-entering the parent (the new call takes its place).
236    fn exit_call_tail(&mut self);
237    /// During replay, return Some(v) to substitute an effect's output.
238    fn override_effect(&mut self, _node_id: &str) -> Option<Value> { None }
239}
240
241/// No-op tracer for normal execution.
242pub struct NullTracer;
243impl Tracer for NullTracer {
244    fn enter_call(&mut self, _: &str, _: &str, _: &[Value]) {}
245    fn enter_effect(&mut self, _: &str, _: &str, _: &str, _: &[Value]) {}
246    fn exit_ok(&mut self, _: &Value) {}
247    fn exit_err(&mut self, _: &str) {}
248    fn exit_call_tail(&mut self) {}
249}
250
251#[derive(Debug, Clone)]
252pub(crate) enum FrameKind {
253    /// Top-level entry frame; doesn't correspond to a Call opcode.
254    Entry,
255    /// Frame opened by Call/TailCall. The `String` is the originating
256    /// `NodeId`; useful for diagnostics even if currently unread.
257    Call(#[allow(dead_code)] String),
258}
259
260pub struct Vm<'a> {
261    program: &'a Program,
262    handler: Box<dyn EffectHandler + 'a>,
263    pub(crate) tracer: Box<dyn Tracer + 'a>,
264    /// Per-call frames. Each frame has its own locals array and pc.
265    frames: Vec<Frame>,
266    stack: Vec<Value>,
267    /// Soft cap to avoid runaway computations in tests.
268    pub step_limit: u64,
269    pub steps: u64,
270    /// Per-Vm memoization cache for pure functions (#229). Keyed by
271    /// `(fn_id, hash_call_args(args))` — a 128-bit structural digest
272    /// of the arguments (see `hash_call_args`). Effectful functions
273    /// never enter this map. The cache lives for the lifetime of one
274    /// `Vm::call` chain — calling `Vm::with_handler` again starts a
275    /// fresh cache.
276    pure_memo: std::collections::HashMap<(u32, [u8; 16]), Value>,
277    /// Diagnostic counters for `--trace` observability (#229).
278    pub pure_memo_hits: u64,
279    pub pure_memo_misses: u64,
280    /// Number of effect-free calls that skipped the cache entirely
281    /// because adaptive memoization disabled their function (#229
282    /// adaptive). Observability only.
283    pub pure_memo_skips: u64,
284    /// Adaptive-memoization state, one entry per function (indexed by
285    /// `fn_id`), parallel to `field_ics` (#229 adaptive). Memoization
286    /// only pays when a function is called repeatedly with equal args;
287    /// the unconditional `hash_call_args` on every effect-free call is
288    /// pure overhead otherwise (the `response_build` profile: 0 hits /
289    /// 3600 misses, ~12% of instructions). After a warmup window with
290    /// zero hits we stop hashing that function's calls — always safe,
291    /// since the callee is pure and recomputing yields the same value.
292    /// Sticky for the Vm's lifetime: a function that hasn't hit in
293    /// `MEMO_WARMUP_CALLS` calls won't amortize later.
294    memo_fn_state: Vec<MemoFnState>,
295    /// Monomorphic inline caches for `Op::GetField` (#462 slice 1 +
296    /// shape-keyed verification slice). Indexed by
297    /// `[fn_id as usize][site_idx as usize]` — one entry per
298    /// field-access site within each function. `site_idx` is assigned
299    /// at compile time by `FnCompiler::field_get_sites` so every emit
300    /// produces a stable identifier independent of pc. The cache
301    /// survives the planned dispatch rewrite (#461) and a future
302    /// JIT (#465).
303    ///
304    /// Slot shape: `(shape_id, offset)`. The pre-shape-keyed slice
305    /// stored only the offset and re-verified each hit by walking
306    /// `IndexMap::get_index(off)` and string-comparing the field name
307    /// against the requested `name_idx`. After this slice, hits
308    /// against compile-time records (real `shape_id`) verify with a
309    /// single `u32` compare and skip the string compare entirely —
310    /// per the #462 slice-2b measurement that observed 0% polymorphism
311    /// and 86% of hits going to records with a real shape_id.
312    ///
313    /// `NO_SHAPE_ID` records (JSON / SQL / HTTP-built — 14% of measured
314    /// hits, 100% of inbox/gateway traffic) fall through to the
315    /// pre-slice name-compare verification. Distinct dynamic shapes
316    /// both carry `NO_SHAPE_ID` and would otherwise alias on a
317    /// pure-shape-keyed IC; keeping the name compare on that path
318    /// preserves correctness without a separate cache for them.
319    ///
320    /// Outer Vec is pre-sized to `program.functions.len()`; each inner
321    /// Vec is empty until the first GetField in that function runs,
322    /// at which point we one-shot allocate it to the compiler-recorded
323    /// `field_ic_sites` size and never resize again. Lazy on the inner
324    /// side so VMs created for short-lived scripts don't eagerly
325    /// allocate IC slots for functions they never enter.
326    field_ics: Vec<Vec<Option<(u32, usize)>>>,
327    /// Stack allocator for function locals (#389 slice 3).
328    ///
329    /// Every function frame claims `locals_count` contiguous slots from
330    /// this Vec on push and releases them on pop.  Because Lex uses
331    /// strictly LIFO frame semantics the most-recently-pushed frame's
332    /// slots always sit at the top of the Vec, so `truncate` is the
333    /// correct (and O(1)) release operation.
334    ///
335    /// The Vec is pre-allocated once at VM construction and then grows
336    /// only if the actual call depth × locals width exceeds the initial
337    /// capacity.  After a top-level `vm.call` returns the Vec is empty
338    /// again but its capacity is retained, so the next request incurs
339    /// zero allocations for locals up to the high-water mark.
340    locals_storage: Vec<Value>,
341    /// Stack-record arena (#464 step 2). Each `Op::AllocStackRecord`
342    /// at a non-escaping site appends its `field_count` field values
343    /// here; the produced `Value::StackRecord` carries `slab_start =
344    /// arena.len() - field_count` so reads are an O(1) slab index.
345    /// On `Op::Return` the arena is truncated back to
346    /// `frame.stack_record_arena_start`, releasing every record the
347    /// frame allocated in O(1) — same lifetime story as
348    /// `locals_storage` for frame locals.
349    ///
350    /// LIFO frame discipline guarantees a frame's records always sit
351    /// at the top of the arena while the frame is live, so neither
352    /// inter-frame interleaving nor index churn can occur.
353    stack_record_arena: Vec<Value>,
354    /// Per-Vm counters for #464 acceptance measurement. Incremented
355    /// on every `Op::MakeRecord` / `Op::AllocStackRecord` dispatch.
356    /// The bench reads these to compute the stack-allocation rate
357    /// (≥ 60% of records on the stack is the acceptance bar). Cheap
358    /// in the hot path — two unconditional u64 increments per record.
359    pub stack_record_allocs: u64,
360    pub stack_record_heap_fallbacks: u64,
361    pub heap_record_allocs: u64,
362    /// Request-scoped arena slab (#463 slice 2a). Mirrors the shape of
363    /// `stack_record_arena` but lives across frames inside the
364    /// request scope opened by `EffectHandler::enter_request_scope`.
365    /// Each `Op::AllocArenaRecord` / `Op::AllocArenaTuple` appends its
366    /// field values here and pushes a handle (`Value::ArenaRecord` /
367    /// `Value::ArenaTuple`) whose `slab_start` indexes back in.
368    /// Truncated to the saved start on `exit_request_scope`, releasing
369    /// every value the scope built in O(1) — same lifetime story as
370    /// `stack_record_arena` truncating on `Op::Return`.
371    ///
372    /// Slabs nest LIFO: `arena_scope_starts` holds the
373    /// `arena_slab.len()` snapshot taken at each `enter_request_scope`,
374    /// and `exit_request_scope` truncates back to the matching entry.
375    /// An empty `arena_scope_starts` means **no active scope** — the
376    /// alloc ops fall back to their `MakeRecord` / `MakeTuple` heap
377    /// path, so the VM stays sound when arena-lowered bytecode runs in
378    /// a non-handler context.
379    arena_slab: Vec<Value>,
380    /// LIFO stack of `arena_slab.len()` snapshots, one per active
381    /// request scope. See `arena_slab`.
382    arena_scope_starts: Vec<u32>,
383    /// Counters for #463 slice-2b acceptance (will be the
384    /// arena-allocation-rate gate, paralleling the #464 stack-rate
385    /// counters above). Incremented in the op handlers; harmless in
386    /// slice 2a since codegen doesn't emit the ops yet.
387    pub arena_record_allocs: u64,
388    pub arena_record_heap_fallbacks: u64,
389    /// Optional JIT tier hook (#465 phase-1 integration). Consulted
390    /// by the `Op::Call` dispatch arm after refinements + memo. See
391    /// `crate::jit_hook` for the trait contract. `None` means
392    /// "interpreter-only" — that branch in the dispatch arm folds
393    /// to a single null-pointer check the optimizer can hoist.
394    jit_hook: Option<Box<dyn crate::jit_hook::JitHook + 'a>>,
395}
396
397struct Frame {
398    fn_id: u32,
399    pc: usize,
400    /// Start index of this frame's locals in `Vm::locals_storage` (#389
401    /// slice 3). The frame owns `locals_storage[locals_start..locals_start
402    /// + locals_len]`; `Op::Return` truncates the Vec back to
403    /// `locals_start`, releasing the slots in O(1).
404    locals_start: usize,
405    locals_len: usize,
406    /// Stack base when this frame started (for cleanup on return).
407    stack_base: usize,
408    trace_kind: FrameKind,
409    /// Pure-fn memo key (#229). `Some(key)` if the call was eligible
410    /// for memoization and missed the cache; on Op::Return the key
411    /// is used to write the return value back into the cache.
412    /// `None` means "don't memoize" — either the function isn't pure,
413    /// the call wasn't through Op::Call, or memoization is disabled.
414    memo_key: Option<(u32, [u8; 16])>,
415    /// #464 step 2: start index of this frame's records in
416    /// `Vm::stack_record_arena`. On `Op::Return`, the arena is
417    /// truncated back here. Identical lifetime discipline to
418    /// `locals_start`.
419    stack_record_arena_start: usize,
420    /// Remaining stack-record budget for this frame, in Value-slot
421    /// units (#464 step 2). Initial value: `STACK_RECORD_BUDGET_SLOTS`.
422    /// When an `Op::AllocStackRecord` would consume more slots than
423    /// remain, the VM falls back to the heap path silently (same
424    /// observable effect as `Op::MakeRecord`), so the budget never
425    /// surfaces as a user-visible error.
426    stack_record_budget_remaining: u32,
427}
428
429/// Sum of `[budget(N)]` declarations on a function's signature
430/// (#225). Used by Op::Call / Op::TailCall / Op::CallClosure to
431/// notify the EffectHandler of per-call budget cost so the handler
432/// can deduct from a shared pool and refuse calls that would
433/// exceed the policy ceiling. Negative `Int` args are ignored —
434/// the static check (`policy::check_program`) treats budgets as
435/// non-negative.
436fn call_budget_cost(f: &crate::program::Function) -> u64 {
437    let mut total: u64 = 0;
438    for e in &f.effects {
439        if e.kind == "budget" {
440            if let Some(crate::program::EffectArg::Int(n)) = &e.arg {
441                if *n >= 0 {
442                    total = total.saturating_add(*n as u64);
443                }
444            }
445        }
446    }
447    total
448}
449
450/// Hash the argument list for a pure-fn memoization lookup (#229).
451///
452/// The memo cache (`pure_memo`) is keyed on this 128-bit digest with
453/// no secondary equality check, so the contract is: argument lists
454/// that are equal under `Value`'s `PartialEq` must produce the same
455/// digest, and the 128-bit width keeps the false-collision rate
456/// (which would return a wrong cached result) negligible.
457///
458/// History (#461 follow-up): this used to build a `serde_json::Value`
459/// of every arg, canonicalize it, and SHA-256 the bytes. Profiling
460/// the `response_build` workload showed that path at 27.6% of all
461/// instructions — it dominated the VM, since every effect-free call
462/// pays it whether or not the cache ever hits. The cache is per-`Vm`
463/// and ephemeral, so a cryptographic, cross-process-stable key was
464/// never needed. We now walk the `Value` tree directly into two
465/// domain-separated `SipHash` passes (deterministic fixed-key
466/// `DefaultHasher`), concatenating the two 64-bit outputs into a
467/// 128-bit key. No JSON allocation, no crypto.
468///
469/// The walk mirrors `Value::PartialEq` so the equal-args-equal-key
470/// contract holds: `Record` is hashed order-independently over its
471/// fields (matching `IndexMap`'s order-insensitive equality),
472/// `Closure` on `(body_hash, captures)` not `fn_id` (#222), and
473/// `Actor`/`Ticker` on pointer identity (matching `Arc::ptr_eq`).
474fn hash_call_args(args: &[Value]) -> [u8; 16] {
475    use std::collections::hash_map::DefaultHasher;
476    use std::hash::Hasher;
477    let mut h0 = DefaultHasher::new();
478    let mut h1 = DefaultHasher::new();
479    // Domain separator: makes the two passes diverge so the
480    // concatenated halves span the full 128-bit space rather than
481    // duplicating one 64-bit value.
482    h1.write_u8(0x9e);
483    h0.write_usize(args.len());
484    h1.write_usize(args.len());
485    for a in args {
486        hash_value_into(a, &mut h0);
487        hash_value_into(a, &mut h1);
488    }
489    let lo = h0.finish();
490    let hi = h1.finish();
491    let mut out = [0u8; 16];
492    out[..8].copy_from_slice(&lo.to_le_bytes());
493    out[8..].copy_from_slice(&hi.to_le_bytes());
494    out
495}
496
497/// Structural hash of a `Value` into `h`, consistent with
498/// `Value::PartialEq`. The leading discriminant byte keeps distinct
499/// variants from colliding (e.g. `Int(0)` vs `Bool(false)`).
500fn hash_value_into<H: std::hash::Hasher>(v: &Value, h: &mut H) {
501    use std::collections::hash_map::DefaultHasher;
502    use std::hash::Hasher as _;
503    match v {
504        Value::Int(n) => { h.write_u8(0x01); h.write_i64(*n); }
505        // Bit pattern, not value: total and deterministic. NaN==NaN
506        // by bits (a memo hit there is harmless — the callee is pure
507        // and returns the same result for bit-identical args), and
508        // +0.0/-0.0 differ (a harmless extra miss).
509        Value::Float(f) => { h.write_u8(0x02); h.write_u64(f.to_bits()); }
510        Value::Bool(b) => { h.write_u8(0x03); h.write_u8(*b as u8); }
511        Value::Str(s) => {
512            h.write_u8(0x04);
513            h.write_usize(s.len());
514            h.write(s.as_bytes());
515        }
516        Value::Bytes(b) => {
517            h.write_u8(0x05);
518            h.write_usize(b.len());
519            h.write(b);
520        }
521        Value::Unit => { h.write_u8(0x06); }
522        Value::List(items) => {
523            h.write_u8(0x07);
524            h.write_usize(items.len());
525            for it in items { hash_value_into(it, h); }
526        }
527        Value::Tuple(items) => {
528            h.write_u8(0x08);
529            h.write_usize(items.len());
530            for it in items { hash_value_into(it, h); }
531        }
532        Value::Deque(items) => {
533            h.write_u8(0x09);
534            h.write_usize(items.len());
535            for it in items { hash_value_into(it, h); }
536        }
537        // `IndexMap` equality is order-insensitive, so the hash must
538        // be too: combine per-entry sub-hashes with wrapping add (a
539        // commutative mix) rather than feeding them in iteration
540        // order.
541        Value::Record { fields, .. } => {
542            h.write_u8(0x0a);
543            let mut combined: u64 = 0;
544            for (k, val) in fields.iter() {
545                let mut e = DefaultHasher::new();
546                e.write(k.as_bytes());
547                e.write_u8(0xff);
548                hash_value_into(val, &mut e);
549                combined = combined.wrapping_add(e.finish());
550            }
551            h.write_u64(combined);
552            h.write_usize(fields.len());
553        }
554        Value::Variant { name, args } => {
555            h.write_u8(0x0b);
556            h.write_usize(name.len());
557            h.write(name.as_bytes());
558            h.write_usize(args.len());
559            for a in args { hash_value_into(a, h); }
560        }
561        // Identity is `(body_hash, captures)`, not `fn_id` (#222).
562        Value::Closure { body_hash, captures, .. } => {
563            h.write_u8(0x0c);
564            h.write(body_hash);
565            h.write_usize(captures.len());
566            for c in captures { hash_value_into(c, h); }
567        }
568        Value::F64Array { rows, cols, data } => {
569            h.write_u8(0x0d);
570            h.write_u32(*rows);
571            h.write_u32(*cols);
572            for f in data { h.write_u64(f.to_bits()); }
573        }
574        // BTreeMap / BTreeSet iterate in sorted key order — already
575        // canonical, so direct feed is order-independent.
576        Value::Map(m) => {
577            h.write_u8(0x0e);
578            h.write_usize(m.len());
579            for (k, val) in m {
580                hash_mapkey_into(k, h);
581                hash_value_into(val, h);
582            }
583        }
584        Value::Set(s) => {
585            h.write_u8(0x0f);
586            h.write_usize(s.len());
587            for k in s { hash_mapkey_into(k, h); }
588        }
589        // Pointer identity, matching `Arc::ptr_eq` in PartialEq.
590        Value::Actor(a) => {
591            h.write_u8(0x10);
592            h.write_usize(Arc::as_ptr(a) as *const () as usize);
593        }
594        Value::Ticker(t) => {
595            h.write_u8(0x11);
596            h.write_usize(Arc::as_ptr(t) as *const () as usize);
597        }
598        // Coarse summary (schema + dimensions), matching the prior
599        // `to_json` encoding which deliberately omitted the cell data
600        // (tables can be GB-scale). Equal tables share schema + dims
601        // so equal-args-equal-key holds; this is no coarser than the
602        // pre-#461-followup behavior.
603        Value::ArrowTable(t) => {
604            h.write_u8(0x12);
605            h.write_i64(t.num_rows() as i64);
606            h.write_i64(t.num_columns() as i64);
607            for f in t.schema().fields() {
608                h.write(f.name().as_bytes());
609                h.write_u8(0xfe);
610            }
611        }
612        // #464: a StackRecord crossing into the memo path means an
613        // escape the analysis was supposed to reject. Mirror the
614        // PartialEq / to_json panic rather than mint a bogus key.
615        Value::StackRecord { .. } =>
616            panic!("BUG(#464): Value::StackRecord reached memo hashing — \
617                    escape analysis should have prevented escape to a call boundary"),
618        Value::StackTuple { .. } =>
619            panic!("BUG(#464): Value::StackTuple reached memo hashing — \
620                    escape analysis should have prevented escape to a call boundary"),
621        // #463 slice 2a: arena handles must never reach memo hashing.
622        // The memo cache outlives every request scope, so a hashed
623        // arena handle would dangle. Slice 1's arena-eligibility
624        // analysis must exclude pure-fn allocation sites (the memo
625        // path is reached only through pure-fn calls) — any reach
626        // here is a soundness bug.
627        Value::ArenaRecord { .. } =>
628            panic!("BUG(#463): Value::ArenaRecord reached memo hashing — \
629                    arena-eligibility analysis must exclude pure-fn allocation sites"),
630        Value::ArenaTuple { .. } =>
631            panic!("BUG(#463): Value::ArenaTuple reached memo hashing — \
632                    arena-eligibility analysis must exclude pure-fn allocation sites"),
633    }
634}
635
636/// Hash a `MapKey` into `h` with its own discriminant so a `Str`
637/// key and an `Int` key never collide.
638fn hash_mapkey_into<H: std::hash::Hasher>(k: &crate::value::MapKey, h: &mut H) {
639    use crate::value::MapKey;
640    match k {
641        MapKey::Str(s) => { h.write_u8(0x01); h.write_usize(s.len()); h.write(s.as_bytes()); }
642        MapKey::Int(n) => { h.write_u8(0x02); h.write_i64(*n); }
643    }
644}
645
646/// Evaluate a refinement predicate at runtime against the actual
647/// argument value (#209 slice 3). Mirrors `lex_types::discharge`'s
648/// static evaluator but operates on `Value` directly.
649///
650/// Returns `Ok(true)` / `Ok(false)` for a clean boolean verdict, or
651/// `Err(reason)` if the predicate references something the runtime
652/// can't resolve (free variable beyond the binding, unsupported AST
653/// node). Callers map `Ok(false)` and `Err` to `VmError::RefinementFailed`.
654fn eval_refinement(
655    predicate: &lex_ast::CExpr,
656    binding: &str,
657    arg: &Value,
658) -> Result<bool, String> {
659    match eval_refinement_inner(predicate, binding, arg) {
660        Ok(Value::Bool(b)) => Ok(b),
661        Ok(other) => Err(format!("predicate didn't reduce to a Bool, got {other:?}")),
662        Err(e) => Err(e),
663    }
664}
665
666fn eval_refinement_inner(
667    e: &lex_ast::CExpr,
668    binding: &str,
669    arg: &Value,
670) -> Result<Value, String> {
671    use lex_ast::{CExpr, CLit};
672    match e {
673        CExpr::Literal { value } => Ok(match value {
674            CLit::Int { value } => Value::Int(*value),
675            CLit::Float { value } => Value::Float(value.parse().unwrap_or(0.0)),
676            CLit::Bool { value } => Value::Bool(*value),
677            CLit::Str { value } => Value::Str(value.as_str().into()),
678            CLit::Bytes { value } => Value::Str(value.as_str().into()), // hex; unusual in predicates
679            CLit::Unit => Value::Unit,
680        }),
681        CExpr::Var { name } if name == binding => Ok(arg.clone()),
682        CExpr::Var { name } => Err(format!(
683            "predicate references free var `{name}`; runtime check \
684             only resolves the binding (slice 4 will plumb call-site \
685             context)")),
686        CExpr::UnaryOp { op, expr } => {
687            let v = eval_refinement_inner(expr, binding, arg)?;
688            match (op.as_str(), v) {
689                ("not", Value::Bool(b)) => Ok(Value::Bool(!b)),
690                ("-", Value::Int(n)) => Ok(Value::Int(-n)),
691                ("-", Value::Float(n)) => Ok(Value::Float(-n)),
692                (o, v) => Err(format!("unsupported unary `{o}` on {v:?}")),
693            }
694        }
695        CExpr::BinOp { op, lhs, rhs } => {
696            // Short-circuit `and` / `or` for the same reasons as the
697            // static evaluator.
698            if op == "and" || op == "or" {
699                let l = eval_refinement_inner(lhs, binding, arg)?;
700                let lb = match l {
701                    Value::Bool(b) => b,
702                    other => return Err(format!("`{op}` on non-bool: {other:?}")),
703                };
704                if op == "and" && !lb { return Ok(Value::Bool(false)); }
705                if op == "or"  &&  lb { return Ok(Value::Bool(true));  }
706                let r = eval_refinement_inner(rhs, binding, arg)?;
707                return match r {
708                    Value::Bool(b) => Ok(Value::Bool(b)),
709                    other => Err(format!("`{op}` on non-bool: {other:?}")),
710                };
711            }
712            let l = eval_refinement_inner(lhs, binding, arg)?;
713            let r = eval_refinement_inner(rhs, binding, arg)?;
714            apply_refinement_binop(op, &l, &r)
715        }
716        // Other AST forms (Call, Let, Match, FieldAccess, Lambda,
717        // Block, Constructors, Records, Tuples, Lists, Return) need
718        // a more general evaluator that can call back into the VM.
719        // Out of scope for slice 3; a future slice may unify this
720        // with the spec-checker's gate evaluator.
721        other => Err(format!("unsupported predicate node: {other:?}")),
722    }
723}
724
725fn apply_refinement_binop(op: &str, l: &Value, r: &Value) -> Result<Value, String> {
726    use Value::*;
727    match (op, l, r) {
728        ("+", Int(a), Int(b)) => Ok(Int(a + b)),
729        ("-", Int(a), Int(b)) => Ok(Int(a - b)),
730        ("*", Int(a), Int(b)) => Ok(Int(a * b)),
731        ("/", Int(a), Int(b)) if *b != 0 => Ok(Int(a / b)),
732        ("%", Int(a), Int(b)) if *b != 0 => Ok(Int(a % b)),
733        ("+", Float(a), Float(b)) => Ok(Float(a + b)),
734        ("-", Float(a), Float(b)) => Ok(Float(a - b)),
735        ("*", Float(a), Float(b)) => Ok(Float(a * b)),
736        ("/", Float(a), Float(b)) => Ok(Float(a / b)),
737
738        ("==", a, b) => Ok(Bool(a == b)),
739        ("!=", a, b) => Ok(Bool(a != b)),
740
741        ("<",  Int(a), Int(b)) => Ok(Bool(a < b)),
742        ("<=", Int(a), Int(b)) => Ok(Bool(a <= b)),
743        (">",  Int(a), Int(b)) => Ok(Bool(a > b)),
744        (">=", Int(a), Int(b)) => Ok(Bool(a >= b)),
745
746        ("<",  Float(a), Float(b)) => Ok(Bool(a < b)),
747        ("<=", Float(a), Float(b)) => Ok(Bool(a <= b)),
748        (">",  Float(a), Float(b)) => Ok(Bool(a > b)),
749        (">=", Float(a), Float(b)) => Ok(Bool(a >= b)),
750
751        (op, a, b) => Err(format!(
752            "unsupported binop `{op}` on {a:?} and {b:?}")),
753    }
754}
755
756fn const_str(constants: &[Const], idx: u32) -> String {
757    match constants.get(idx as usize) {
758        Some(Const::NodeId(s)) | Some(Const::Str(s)) => s.clone(),
759        _ => String::new(),
760    }
761}
762
763/// Read `LEX_PAR_MAX_CONCURRENCY` (default = available CPU cores,
764/// fallback 4). Capped at 64 so a malformed env var can't spawn an
765/// unreasonable number of OS threads.
766/// Order-defining comparator for `list.sort_by` keys (#338).
767/// Same-typed Int / Float / Str pairs compare via their native
768/// `Ord` / `PartialOrd`. Mixed-type or other key shapes compare
769/// as Equal; combined with `Vec::sort_by`'s stability that
770/// preserves the original element order — best-effort fallback
771/// that never panics.
772fn compare_sort_keys(a: &Value, b: &Value) -> std::cmp::Ordering {
773    use std::cmp::Ordering;
774    match (a, b) {
775        (Value::Int(x), Value::Int(y)) => x.cmp(y),
776        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
777        (Value::Str(x), Value::Str(y)) => x.cmp(y),
778        _ => Ordering::Equal,
779    }
780}
781
782fn par_max_concurrency() -> usize {
783    let from_env = std::env::var("LEX_PAR_MAX_CONCURRENCY")
784        .ok()
785        .and_then(|s| s.parse::<usize>().ok())
786        .filter(|n| *n > 0);
787    let default = std::thread::available_parallelism()
788        .map(|n| n.get())
789        .unwrap_or(4);
790    from_env.unwrap_or(default).min(64)
791}
792
793/// `list.par_map`'s runtime: spawn OS threads (capped by
794/// `LEX_PAR_MAX_CONCURRENCY`), apply `closure` to each item, return
795/// results in input order. Each worker runs a fresh `Vm` with
796/// [`DenyAllEffects`] for #305 slice 1 — effectful closures fail
797/// with `VmError::Effect`. Slice 2 will plumb a per-thread effect
798/// handler split.
799fn par_map_run<'a>(
800    program: &'a Program,
801    closure: Value,
802    items: Vec<Value>,
803    worker_handlers: Vec<Box<dyn EffectHandler + Send>>,
804    // Step budget for each worker VM. Without this, workers fall back to the
805    // 10M default and ignore the caller's `--max-steps`, so a closure that does
806    // real work (e.g. parsing a large payload) spuriously trips the step limit
807    // inside a par_map even when the top-level run raised it. Inherit the
808    // parent's limit so `--max-steps` applies uniformly.
809    step_limit: u64,
810) -> Result<Vec<Value>, VmError> {
811    if items.is_empty() {
812        return Ok(Vec::new());
813    }
814    let n_workers = worker_handlers.len().min(items.len()).max(1);
815    // Carve items into `n_workers` round-robin buckets so each
816    // worker processes (indices, items) pairs and we can reassemble
817    // in input order.
818    let mut buckets: Vec<Vec<(usize, Value)>> = (0..n_workers).map(|_| Vec::new()).collect();
819    for (i, v) in items.into_iter().enumerate() {
820        buckets[i % n_workers].push((i, v));
821    }
822    let n_total: usize = buckets.iter().map(|b| b.len()).sum();
823    let results: std::sync::Mutex<Vec<Option<Result<Value, String>>>> =
824        std::sync::Mutex::new((0..n_total).map(|_| None).collect());
825
826    // Pair each bucket with its pre-built handler so workers own
827    // their handler outright — no shared mutable state across
828    // worker threads.
829    let mut worker_handlers = worker_handlers;
830    worker_handlers.truncate(n_workers);
831    type Pair = (Vec<(usize, Value)>, Box<dyn EffectHandler + Send>);
832    let pairs: Vec<Pair> = buckets.into_iter().zip(worker_handlers).collect();
833
834    std::thread::scope(|s| {
835        let mut handles = Vec::with_capacity(pairs.len());
836        for (bucket, handler) in pairs {
837            let closure = closure.clone();
838            let results = &results;
839            handles.push(s.spawn(move || {
840                // `Box<dyn EffectHandler + Send>` has implicit
841                // `+ 'static`; that coerces to `+ 'a` because
842                // `'static` outlives any `'a`. The `Send` bound is
843                // auto-erased on the unsize coercion.
844                let handler_for_vm: Box<dyn EffectHandler + 'a> = handler;
845                let mut vm = Vm::with_handler(program, handler_for_vm);
846                vm.set_step_limit(step_limit);
847                for (idx, item) in bucket {
848                    let r = vm
849                        .invoke_closure_value(closure.clone(), vec![item])
850                        .map_err(|e| format!("{e:?}"));
851                    results.lock().unwrap()[idx] = Some(r);
852                }
853            }));
854        }
855        for h in handles {
856            h.join().map_err(|_| ()).ok();
857        }
858    });
859
860    let mut out = Vec::with_capacity(n_total);
861    let inner = results.into_inner().unwrap();
862    for r in inner {
863        match r {
864            Some(Ok(v)) => out.push(v),
865            Some(Err(e)) => return Err(VmError::Effect(format!("par_map worker: {e}"))),
866            None => return Err(VmError::Panic("par_map worker did not produce a result".into())),
867        }
868    }
869    Ok(out)
870}
871
872impl<'a> Vm<'a> {
873    pub fn new(program: &'a Program) -> Self {
874        Self::with_handler(program, Box::new(DenyAllEffects))
875    }
876
877    pub fn with_handler(program: &'a Program, handler: Box<dyn EffectHandler + 'a>) -> Self {
878        Self {
879            program,
880            handler,
881            tracer: Box::new(NullTracer),
882            // Pre-allocate enough capacity for a typical request so the first
883            // call incurs no reallocation (#389 slice 3).
884            frames: Vec::with_capacity(32),
885            stack: Vec::with_capacity(128),
886            step_limit: 10_000_000,
887            steps: 0,
888            pure_memo: std::collections::HashMap::new(),
889            pure_memo_hits: 0,
890            pure_memo_misses: 0,
891            pure_memo_skips: 0,
892            memo_fn_state: vec![MemoFnState::default(); program.functions.len()],
893            field_ics: vec![Vec::new(); program.functions.len()],
894            // 256 slots handles ~32 frames × 8 locals; grows on demand and
895            // retains capacity across consecutive vm.call() invocations.
896            locals_storage: Vec::with_capacity(256),
897            // #464 step 2: zero capacity at construction — handlers that
898            // never AllocStackRecord (most code today, until the lowering
899            // pass kicks in) pay nothing. First allocation triggers Vec
900            // growth; capacity is retained across `vm.call` invocations.
901            stack_record_arena: Vec::new(),
902            stack_record_allocs: 0,
903            stack_record_heap_fallbacks: 0,
904            heap_record_allocs: 0,
905            // #463 slice 2a: empty until the first enter_request_scope.
906            // Programs that never enter a scope incur zero arena cost
907            // (the alloc ops, if reached, fall back to the heap path).
908            arena_slab: Vec::new(),
909            arena_scope_starts: Vec::new(),
910            arena_record_allocs: 0,
911            arena_record_heap_fallbacks: 0,
912            jit_hook: None,
913        }
914    }
915
916    pub fn set_tracer(&mut self, tracer: Box<dyn Tracer + 'a>) {
917        self.tracer = tracer;
918    }
919
920    /// Install (or replace) the JIT hook consulted by `Op::Call`'s
921    /// dispatch arm. With `None`, dispatch behaves exactly as before
922    /// — the hook check is a single null-option branch the optimizer
923    /// can hoist. See the [`crate::jit_hook`] module for the
924    /// contract callers must uphold.
925    pub fn set_jit_hook(&mut self, hook: Option<Box<dyn crate::jit_hook::JitHook + 'a>>) {
926        self.jit_hook = hook;
927    }
928
929    /// Cap the number of opcode dispatches before the VM aborts with
930    /// `step limit exceeded`. Useful as a runtime DoS guard against
931    /// untrusted code (e.g. the `agent-tool` sandbox, where an LLM
932    /// could emit `list.fold(list.range(0, 1_000_000_000), …)` to hang
933    /// the host). Default is 10_000_000.
934    pub fn set_step_limit(&mut self, limit: u64) {
935        self.step_limit = limit;
936    }
937
938    pub fn call(&mut self, name: &str, args: Vec<Value>) -> Result<Value, VmError> {
939        let fn_id = self.program.lookup(name).ok_or_else(|| VmError::Panic(format!("no function `{name}`")))?;
940        self.invoke(fn_id, args)
941    }
942
943    /// Vm-level handler for `parser.run` (#221). Routed here from
944    /// `Op::EffectCall` rather than through the `EffectHandler` so
945    /// the recursive parser interpreter has reentrant Vm access for
946    /// closure invocation. Returns the wrapped `Result[T, ParseErr]`
947    /// value the language sees.
948    fn run_parser_op(&mut self, args: Vec<Value>) -> Result<Value, String> {
949        let parser = args.first().cloned()
950            .ok_or_else(|| "parser.run: missing parser arg".to_string())?;
951        let input = match args.get(1) {
952            Some(Value::Str(s)) => s.clone(),
953            _ => return Err("parser.run: input must be Str".into()),
954        };
955        match crate::parser_runtime::run_parser(&parser, &input, 0, self) {
956            Ok((value, _pos)) => Ok(Value::Variant {
957                name: "Ok".into(),
958                args: vec![value],
959            }),
960            Err((pos, msg)) => {
961                let mut e: IndexMap<String, Value> = IndexMap::new();
962                e.insert("pos".into(), Value::Int(pos as i64));
963                e.insert("message".into(), Value::Str(msg.into()));
964                Ok(Value::Variant {
965                    name: "Err".into(),
966                    args: vec![Value::record_dynamic(e)],
967                })
968            }
969        }
970    }
971
972    // ---- Variant helpers used by conc.* registry ops (#444) ----
973    // Local helpers (avoid pulling in serde / public API). Lex's
974    // `Result`/`Option` are stdlib unions; their runtime shape is a
975    // `Value::Variant { name, args }` with the constructor name as
976    // declared (`Ok`/`Err`/`Some`/`None`).
977
978    /// VM-level handler for `conc.*` effect ops (#381).
979    ///
980    /// * `conc.spawn(init, handler)` — creates an `Actor` wrapping the
981    ///   initial state and the handler closure. No background thread is
982    ///   started; the actor runs synchronously on the calling thread
983    ///   under a `Mutex` so concurrent callers serialise.
984    ///
985    /// * `conc.ask(actor, msg)` — locks the actor, calls
986    ///   `handler(state, msg)` on *this* VM (reentrant), expects a
987    ///   2-tuple `(new_state, reply)`, updates the actor's state, and
988    ///   returns `reply`.
989    ///
990    /// * `conc.tell(actor, msg)` — same as `ask` but discards the
991    ///   reply and returns `Unit`.
992    fn run_conc_op(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
993        match op {
994            "spawn" => {
995                let mut it = args.into_iter();
996                let init = it.next().unwrap_or(Value::Unit);
997                let handler = it.next().unwrap_or(Value::Unit);
998                if !matches!(handler, Value::Closure { .. }) {
999                    return Err(format!(
1000                        "conc.spawn: handler must be a Closure, got {handler:?}"));
1001                }
1002                Ok(Value::Actor(Arc::new(Mutex::new(ActorCell {
1003                    state: init,
1004                    handler: crate::value::ActorHandler::Lex(handler),
1005                }))))
1006            }
1007            "ask" | "tell" => {
1008                let mut it = args.into_iter();
1009                let actor_val = it.next().unwrap_or(Value::Unit);
1010                let msg = it.next().unwrap_or(Value::Unit);
1011                let cell = match actor_val {
1012                    Value::Actor(ref arc) => Arc::clone(arc),
1013                    other => return Err(format!(
1014                        "conc.{op}: first arg must be an Actor, got {other:?}")),
1015                };
1016                // Lock the actor: guarantees at-most-one-concurrent message.
1017                let mut guard = cell.lock().map_err(|e| format!("conc.{op}: actor mutex poisoned: {e}"))?;
1018                let handler = guard.handler.clone();
1019                let state = guard.state.clone();
1020                match handler {
1021                    crate::value::ActorHandler::Lex(closure_val) => {
1022                        // Call handler(state, msg) on this VM — full effect access.
1023                        let result = self.invoke_closure_value(closure_val, vec![state, msg])
1024                            .map_err(|e| format!("conc.{op}: handler error: {e:?}"))?;
1025                        // Expect (new_state, reply) tuple.
1026                        match result {
1027                            Value::Tuple(mut parts) if parts.len() == 2 => {
1028                                let reply = parts.pop().unwrap();
1029                                let new_state = parts.pop().unwrap();
1030                                guard.state = new_state;
1031                                drop(guard);
1032                                if op == "ask" { Ok(reply) } else { Ok(Value::Unit) }
1033                            }
1034                            other => Err(format!(
1035                                "conc.{op}: handler must return a 2-tuple (new_state, reply), got {other:?}")),
1036                        }
1037                    }
1038                    crate::value::ActorHandler::Native(native) => {
1039                        // Native bridge: fire-and-forget; `state` is unused
1040                        // (the bridge's "state" is the external resource, e.g.
1041                        // a WebSocket connection). The closure receives `msg`
1042                        // directly. `ask` returns whatever the bridge produces;
1043                        // `tell` discards it. State stays untouched.
1044                        drop(guard);
1045                        let result = (native.send)(msg)
1046                            .map_err(|e| format!("conc.{op}: native handler error: {e}"))?;
1047                        if op == "ask" { Ok(result) } else { Ok(Value::Unit) }
1048                    }
1049                }
1050            }
1051            "register" => {
1052                // conc.register(actor, name) -> Result[Unit, ConcError]
1053                // Returns Ok(Unit) on first register, Err(AlreadyRegistered(name))
1054                // if the name is taken. v1 stores the actor opaquely —
1055                // see crate::conc_registry for the type-tag note.
1056                let mut it = args.into_iter();
1057                let actor = it.next().unwrap_or(Value::Unit);
1058                if !matches!(actor, Value::Actor(_)) {
1059                    return Err(format!(
1060                        "conc.register: first arg must be an Actor, got {actor:?}"));
1061                }
1062                let name = match it.next() {
1063                    Some(Value::Str(s)) => s.to_string(),
1064                    other => return Err(format!(
1065                        "conc.register: name must be Str, got {other:?}")),
1066                };
1067                Ok(match crate::conc_registry::register(&name, actor) {
1068                    Ok(()) => variant_ok(Value::Unit),
1069                    Err(crate::conc_registry::RegError::AlreadyRegistered(n)) => {
1070                        variant_err(variant("AlreadyRegistered", vec![Value::Str(n.into())]))
1071                    }
1072                    Err(crate::conc_registry::RegError::NotRegistered(_)) => {
1073                        unreachable!("register cannot produce NotRegistered")
1074                    }
1075                })
1076            }
1077            "lookup" => {
1078                // conc.lookup(name) -> Option[Actor[S, M]]
1079                // Returns Some(actor) if registered, None otherwise. The
1080                // [S, M] static parametrisation at the call site is not
1081                // checked at runtime in v1 — caller's responsibility to
1082                // match the registration site's type.
1083                let mut it = args.into_iter();
1084                let name = match it.next() {
1085                    Some(Value::Str(s)) => s.to_string(),
1086                    other => return Err(format!(
1087                        "conc.lookup: name must be Str, got {other:?}")),
1088                };
1089                Ok(match crate::conc_registry::lookup(&name) {
1090                    Some(actor) => variant("Some", vec![actor]),
1091                    None => variant("None", vec![]),
1092                })
1093            }
1094            "unregister" => {
1095                // conc.unregister(name) -> Result[Unit, ConcError]
1096                let mut it = args.into_iter();
1097                let name = match it.next() {
1098                    Some(Value::Str(s)) => s.to_string(),
1099                    other => return Err(format!(
1100                        "conc.unregister: name must be Str, got {other:?}")),
1101                };
1102                Ok(match crate::conc_registry::unregister(&name) {
1103                    Ok(()) => variant_ok(Value::Unit),
1104                    Err(crate::conc_registry::RegError::NotRegistered(n)) => {
1105                        variant_err(variant("NotRegistered", vec![Value::Str(n.into())]))
1106                    }
1107                    Err(crate::conc_registry::RegError::AlreadyRegistered(_)) => {
1108                        unreachable!("unregister cannot produce AlreadyRegistered")
1109                    }
1110                })
1111            }
1112            "registered" => {
1113                // conc.registered() -> List[Str] — sorted snapshot.
1114                let names = crate::conc_registry::registered();
1115                Ok(Value::List(names.into_iter()
1116                    .map(|n| Value::Str(n.into()))
1117                    .collect()))
1118            }
1119            other => Err(format!("unknown conc.{other}")),
1120        }
1121    }
1122
1123    /// Invoke a `Value::Closure` by combining its captures with the
1124    /// supplied call args and dispatching to the underlying function.
1125    /// Used by the parser interpreter (#221) to call user-supplied
1126    /// `f` arguments inside `parser.map` / `parser.and_then` nodes.
1127    pub fn invoke_closure_value(
1128        &mut self,
1129        closure: Value,
1130        args: Vec<Value>,
1131    ) -> Result<Value, VmError> {
1132        let (fn_id, captures) = match closure {
1133            Value::Closure { fn_id, captures, .. } => (fn_id, captures),
1134            other => return Err(VmError::TypeMismatch(
1135                format!("invoke_closure_value: not a closure: {other:?}"))),
1136        };
1137        let mut combined = captures;
1138        combined.extend(args);
1139        self.invoke(fn_id, combined)
1140    }
1141
1142    /// Invoke a 1-arg closure without allocating a separate args
1143    /// `Vec` (#464 call-overhead). The closure's own `captures` Vec
1144    /// is reused as the combined `captures ++ [arg]` argument buffer,
1145    /// so the per-element call in `ListMap`/`ListFilter`/`SortByKey`
1146    /// allocates at most once (the `push`) instead of twice (a fresh
1147    /// `vec![arg]` plus the `extend`). Semantically identical to
1148    /// `invoke_closure_value(closure, vec![arg])`.
1149    pub fn invoke_closure_1(&mut self, closure: Value, arg: Value) -> Result<Value, VmError> {
1150        let (fn_id, mut combined) = match closure {
1151            Value::Closure { fn_id, captures, .. } => (fn_id, captures),
1152            other => return Err(VmError::TypeMismatch(
1153                format!("invoke_closure_1: not a closure: {other:?}"))),
1154        };
1155        combined.push(arg);
1156        self.invoke(fn_id, combined)
1157    }
1158
1159    /// Invoke a 2-arg closure without a separate args `Vec` — the
1160    /// `ListFold` combiner path. See `invoke_closure_1`.
1161    pub fn invoke_closure_2(&mut self, closure: Value, a: Value, b: Value) -> Result<Value, VmError> {
1162        let (fn_id, mut combined) = match closure {
1163            Value::Closure { fn_id, captures, .. } => (fn_id, captures),
1164            other => return Err(VmError::TypeMismatch(
1165                format!("invoke_closure_2: not a closure: {other:?}"))),
1166        };
1167        combined.push(a);
1168        combined.push(b);
1169        self.invoke(fn_id, combined)
1170    }
1171
1172    /// Open a request-scoped arena via the underlying
1173    /// `EffectHandler::enter_request_scope` (#463 scaffolding).
1174    /// Runtime layers — `net.serve_fn`, `net.serve_ws`,
1175    /// `net.serve_quic` — call this immediately before invoking the
1176    /// user handler closure for a single request. Pair with
1177    /// `exit_request_scope` once the response has been built and
1178    /// any lazy iterators in it have been drained (#477).
1179    ///
1180    /// Returns the scope id the runtime should pass back to
1181    /// `exit_request_scope`. The handler's default impl returns 0
1182    /// and the matching `exit` is a no-op; `DefaultHandler`'s
1183    /// implementation actually allocates an arena.
1184    pub fn enter_request_scope(&mut self) -> u64 {
1185        // #463 slice 2a: snapshot the slab high-water mark so
1186        // `exit_request_scope` can truncate back to here, releasing
1187        // every arena-allocated value the scope built in O(1).
1188        self.arena_scope_starts.push(self.arena_slab.len() as u32);
1189        self.handler.enter_request_scope()
1190    }
1191
1192    /// True iff there is at least one active request scope — i.e. an
1193    /// `enter_request_scope` not yet matched by `exit_request_scope`.
1194    /// Runtime layers use this to skip `materialize_arena_handles` on
1195    /// paths where no scope was entered (e.g. tiny-http worker
1196    /// dispatch), keeping the no-arena path zero-cost. Slice 2b-i.
1197    pub fn arena_scope_active(&self) -> bool {
1198        !self.arena_scope_starts.is_empty()
1199    }
1200
1201    /// Close the request scope opened by `enter_request_scope`.
1202    /// Drops the associated arena.
1203    pub fn exit_request_scope(&mut self, scope_id: u64) {
1204        // #463 slice 2a: truncate the slab back to the matching
1205        // `enter` snapshot, then notify the handler. Out-of-order /
1206        // unpaired exits (e.g. a stray `exit` with no prior `enter`)
1207        // are tolerated as no-ops — the handler does the same, and a
1208        // stray exit shouldn't crash a live server.
1209        if let Some(start) = self.arena_scope_starts.pop() {
1210            self.arena_slab.truncate(start as usize);
1211        }
1212        self.handler.exit_request_scope(scope_id)
1213    }
1214
1215    /// Deep-walk `value` and resolve every `Value::ArenaRecord` /
1216    /// `Value::ArenaTuple` handle into its heap-owned equivalent
1217    /// (`Value::Record` / `Value::Tuple`), reading field contents
1218    /// out of `Vm::arena_slab` along the way. Primitives, closures,
1219    /// maps/sets, and the host-managed handles (`Actor` / `Ticker` /
1220    /// `ArrowTable`) are returned unchanged.
1221    ///
1222    /// **The boundary helper** flagged in
1223    /// `docs/design/arena-plumbing.md` § "Arena handles MUST be
1224    /// readable at serialization". Callers — the response
1225    /// serialization path in `lex-runtime`, the trace recorder when
1226    /// it records a Call/EffectCall arg, anywhere a value crosses
1227    /// out of the VM into host-managed storage — call this
1228    /// **while the producing scope is still active**, before
1229    /// `exit_request_scope`. After exit the slab is truncated, so a
1230    /// handle materialized after-the-fact would read garbage (or
1231    /// panic on the bounds check).
1232    ///
1233    /// `Value::StackRecord` / `Value::StackTuple` would similarly
1234    /// need slab resolution, but the #464 escape analysis prevents
1235    /// them from reaching boundary-crossing ops in the first place
1236    /// (they're frame-local by construction). Reaching here means a
1237    /// hand-built or analysis-buggy program; we panic with the same
1238    /// loud-not-silent contract the other inspection paths use.
1239    ///
1240    /// Idempotent on already-materialized values (no arena handles
1241    /// in the tree → only the recursive walk's clones, no slab
1242    /// lookups). Cost per call is one walk + clone of the tree —
1243    /// amortized over the per-node mallocs avoided during request
1244    /// handling, the net stays strongly positive.
1245    pub fn materialize_arena_handles(&self, value: Value) -> Value {
1246        use crate::value::Value as V;
1247        match value {
1248            // Primitives + opaque handles cross unchanged. Cheap
1249            // — clones are essentially free for the Copy-ish ones
1250            // and Arc-bumps for the handle types.
1251            V::Int(_) | V::Float(_) | V::Bool(_) | V::Str(_) | V::Bytes(_)
1252            | V::Unit | V::Closure { .. } | V::F64Array { .. }
1253            | V::Map(_) | V::Set(_) | V::Actor(_) | V::Ticker(_)
1254            | V::ArrowTable(_) => value,
1255
1256            // Containers: recurse on each element. Map/Set keys are
1257            // MapKey (Str | Int), never Value, so no handles can
1258            // hide there.
1259            V::List(items) => V::List(
1260                items.into_iter().map(|v| self.materialize_arena_handles(v)).collect()),
1261            V::Tuple(items) => V::Tuple(
1262                items.into_iter().map(|v| self.materialize_arena_handles(v)).collect()),
1263            V::Deque(items) => V::Deque(
1264                items.into_iter().map(|v| self.materialize_arena_handles(v)).collect()),
1265            V::Variant { name, args } => V::Variant {
1266                name,
1267                args: args.into_iter().map(|v| self.materialize_arena_handles(v)).collect(),
1268            },
1269            V::Record { shape_id, fields } => {
1270                let mut out: IndexMap<SmolStr, Value> = IndexMap::with_capacity(fields.len());
1271                for (k, v) in fields.into_iter() {
1272                    out.insert(k, self.materialize_arena_handles(v));
1273                }
1274                V::Record { shape_id, fields: Box::new(out) }
1275            }
1276
1277            // The actual resolution work — read the slab and build a
1278            // heap form. Field-name ordering for ArenaRecord matches
1279            // the shape's, same as `MakeRecord`'s IndexMap insertion
1280            // pattern; that's the contract that makes the polymorphic
1281            // GetField IC work, and we reuse it here.
1282            V::ArenaRecord { shape_id, slab_start, field_count } => {
1283                let start = slab_start as usize;
1284                let n = field_count as usize;
1285                debug_assert!(start + n <= self.arena_slab.len(),
1286                    "ArenaRecord handle out of bounds — likely materialized after exit_request_scope");
1287                let shape = &self.program.record_shapes[shape_id as usize];
1288                let mut fields: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
1289                for (i, name_const_idx) in shape.iter().take(n).enumerate() {
1290                    let name: SmolStr = match &self.program.constants[*name_const_idx as usize] {
1291                        Const::FieldName(s) => s.as_str().into(),
1292                        _ => panic!("BUG(#463): ArenaRecord shape entry not a FieldName const"),
1293                    };
1294                    let v = self.materialize_arena_handles(self.arena_slab[start + i].clone());
1295                    fields.insert(name, v);
1296                }
1297                V::Record { shape_id, fields: Box::new(fields) }
1298            }
1299            V::ArenaTuple { slab_start, arity } => {
1300                let start = slab_start as usize;
1301                let n = arity as usize;
1302                debug_assert!(start + n <= self.arena_slab.len(),
1303                    "ArenaTuple handle out of bounds — likely materialized after exit_request_scope");
1304                let items: Vec<Value> = (0..n)
1305                    .map(|i| self.materialize_arena_handles(self.arena_slab[start + i].clone()))
1306                    .collect();
1307                V::Tuple(items)
1308            }
1309
1310            // #464 stack handles are frame-local; the analysis
1311            // prevents them from reaching any boundary the
1312            // materializer is called at. Reach = bug; panic loud.
1313            V::StackRecord { .. } =>
1314                panic!("BUG(#464/#463): Value::StackRecord reached materialize_arena_handles \
1315                        — escape analysis should keep stack handles inside their frame"),
1316            V::StackTuple { .. } =>
1317                panic!("BUG(#464/#463): Value::StackTuple reached materialize_arena_handles \
1318                        — escape analysis should keep stack handles inside their frame"),
1319        }
1320    }
1321
1322    /// Read a named field out of a record without materializing its
1323    /// parent. Works uniformly on `Value::Record` (heap) and
1324    /// `Value::ArenaRecord` (slab handle), so a runtime layer can
1325    /// consume the response record structurally — straight out of
1326    /// the arena slab — instead of paying for a tree-wide
1327    /// `materialize_arena_handles` walk just to read three top-level
1328    /// fields.
1329    ///
1330    /// Returns `None` if the value isn't a record or the field
1331    /// doesn't exist. The returned `Value` is a clone of the slot
1332    /// contents (records' field values can themselves be records,
1333    /// variants, etc.; cloning at the boundary is unavoidable
1334    /// without lifetime trickery on the public API).
1335    ///
1336    /// Performance: on the heap path it's a `IndexMap::get` + clone.
1337    /// On the arena path it's a linear walk of the shape's
1338    /// field-name vec (`field_count` long, typically ≤ 10) +
1339    /// an O(1) slab index + clone. The polymorphic-IC equivalent
1340    /// inside the VM is faster, but this API is for **host**
1341    /// consumers, not hot-loop dispatch.
1342    ///
1343    /// `Value::StackRecord` is deliberately not handled — those
1344    /// handles are frame-local by construction (#464 escape pass)
1345    /// and shouldn't reach host boundaries; reaching them here is
1346    /// a soundness bug surfaced as a panic, matching the existing
1347    /// inspection-path contract.
1348    pub fn get_record_field(&self, value: &Value, name: &str) -> Option<Value> {
1349        match value {
1350            Value::Record { fields, .. } => fields.get(name).cloned(),
1351            Value::ArenaRecord { shape_id, slab_start, field_count } => {
1352                let shape = self.program.record_shapes.get(*shape_id as usize)?;
1353                let n = (*field_count as usize).min(shape.len());
1354                for (i, &name_const_idx) in shape.iter().take(n).enumerate() {
1355                    if let Const::FieldName(s) = &self.program.constants[name_const_idx as usize] {
1356                        if s == name {
1357                            return Some(self.arena_slab[*slab_start as usize + i].clone());
1358                        }
1359                    }
1360                }
1361                None
1362            }
1363            Value::StackRecord { .. } =>
1364                panic!("BUG(#464): Value::StackRecord reached Vm::get_record_field \
1365                        — frame-local handles should never reach the host boundary"),
1366            _ => None,
1367        }
1368    }
1369
1370    /// Positional read out of a tuple without materializing its
1371    /// parent. Works uniformly on `Value::Tuple` and
1372    /// `Value::ArenaTuple`. See `get_record_field` for the lifetime
1373    /// rationale.
1374    pub fn get_tuple_elem(&self, value: &Value, idx: u16) -> Option<Value> {
1375        match value {
1376            Value::Tuple(items) => items.get(idx as usize).cloned(),
1377            Value::ArenaTuple { slab_start, arity } => {
1378                if idx >= *arity { return None; }
1379                Some(self.arena_slab[*slab_start as usize + idx as usize].clone())
1380            }
1381            Value::StackTuple { .. } =>
1382                panic!("BUG(#464): Value::StackTuple reached Vm::get_tuple_elem \
1383                        — frame-local handles should never reach the host boundary"),
1384            _ => None,
1385        }
1386    }
1387
1388    /// Arena-aware `to_json` — produces a `serde_json::Value` from
1389    /// a `Value` whose tree may contain `ArenaRecord` / `ArenaTuple`
1390    /// handles, reading them straight out of `Vm::arena_slab`
1391    /// instead of materializing into a heap `Value::Record` mirror
1392    /// first.
1393    ///
1394    /// Equivalent output to `value.to_json()` on a fully-materialized
1395    /// tree (idempotent in that sense). Use this when serializing a
1396    /// handler return value to JSON for the response — saves the
1397    /// per-node IndexMap allocations the materialize-then-to_json
1398    /// pattern pays.
1399    pub fn value_to_json(&self, value: &Value) -> serde_json::Value {
1400        use serde_json::Value as J;
1401        match value {
1402            // Primitives + opaque host handles: delegate to the
1403            // existing `Value::to_json` — its output is identical
1404            // and it handles the host-handle types we don't model
1405            // (Actor / Ticker / ArrowTable / F64Array / Map / Set /
1406            // Closure / Bytes encoding) in one place.
1407            Value::Int(_) | Value::Float(_) | Value::Bool(_) | Value::Str(_)
1408            | Value::Bytes(_) | Value::Unit | Value::Closure { .. }
1409            | Value::F64Array { .. } | Value::Map(_) | Value::Set(_)
1410            | Value::Actor(_) | Value::Ticker(_) | Value::ArrowTable(_)
1411                => value.to_json(),
1412
1413            Value::List(items) => J::Array(items.iter().map(|v| self.value_to_json(v)).collect()),
1414            Value::Tuple(items) => J::Array(items.iter().map(|v| self.value_to_json(v)).collect()),
1415            Value::Deque(items) => J::Array(items.iter().map(|v| self.value_to_json(v)).collect()),
1416            Value::Variant { name, args } => {
1417                let mut m = serde_json::Map::new();
1418                m.insert("$variant".into(), J::String(name.clone()));
1419                m.insert("args".into(),
1420                    J::Array(args.iter().map(|v| self.value_to_json(v)).collect()));
1421                J::Object(m)
1422            }
1423            Value::Record { fields, .. } => {
1424                let mut m = serde_json::Map::new();
1425                for (k, v) in fields.iter() {
1426                    m.insert(k.to_string(), self.value_to_json(v));
1427                }
1428                J::Object(m)
1429            }
1430
1431            // Slab-direct: read the cells in shape order, emit a
1432            // JSON object using the shape's field names. The cost
1433            // delta vs the `Value::to_json` materialize-then-walk
1434            // path is the saved `Box<IndexMap>` allocation +
1435            // insertion + drop.
1436            Value::ArenaRecord { shape_id, slab_start, field_count } => {
1437                let shape = match self.program.record_shapes.get(*shape_id as usize) {
1438                    Some(s) => s,
1439                    None => return J::Null,
1440                };
1441                let n = (*field_count as usize).min(shape.len());
1442                let mut m = serde_json::Map::with_capacity(n);
1443                for (i, &name_const_idx) in shape.iter().take(n).enumerate() {
1444                    let name = match &self.program.constants[name_const_idx as usize] {
1445                        Const::FieldName(s) => s.to_string(),
1446                        _ => continue,
1447                    };
1448                    let cell = &self.arena_slab[*slab_start as usize + i];
1449                    m.insert(name, self.value_to_json(cell));
1450                }
1451                J::Object(m)
1452            }
1453            Value::ArenaTuple { slab_start, arity } => {
1454                let start = *slab_start as usize;
1455                let n = *arity as usize;
1456                let items: Vec<serde_json::Value> = (0..n)
1457                    .map(|i| self.value_to_json(&self.arena_slab[start + i]))
1458                    .collect();
1459                J::Array(items)
1460            }
1461
1462            // Stack handles must not reach the host — same defensive
1463            // panic as the other inspection paths.
1464            Value::StackRecord { .. } =>
1465                panic!("BUG(#464): Value::StackRecord reached Vm::value_to_json \
1466                        — frame-local handles should never reach the host boundary"),
1467            Value::StackTuple { .. } =>
1468                panic!("BUG(#464): Value::StackTuple reached Vm::value_to_json \
1469                        — frame-local handles should never reach the host boundary"),
1470        }
1471    }
1472
1473    pub fn invoke(&mut self, fn_id: u32, args: Vec<Value>) -> Result<Value, VmError> {
1474        let f = &self.program.functions[fn_id as usize];
1475        if args.len() != f.arity as usize {
1476            return Err(VmError::Panic(format!("arity mismatch calling {}", f.name)));
1477        }
1478        // Refinement runtime check at the public entry point too
1479        // (#209 slice 3). `Op::Call` checks for in-program calls;
1480        // this branch covers `vm.call("entry", ...)` from the host
1481        // and the reentrant `invoke_closure_value` path. Same
1482        // semantics, same error shape.
1483        //
1484        // Iterate `f.refinements` by reference — the loop body
1485        // only reads from `self.program` (via `r`) and from locals,
1486        // so we don't need to clone the Vec to detach it from
1487        // `&self`. The function name is cloned **lazily**, only on
1488        // the failure path: functions with no refinements (the common
1489        // case) never enter the loop, so the per-call `f.name.clone()`
1490        // was pure waste on the hot path (#464 call-overhead).
1491        for (i, refinement) in f.refinements.iter().enumerate() {
1492            if let Some(r) = refinement {
1493                let arg = args.get(i).cloned().unwrap_or(Value::Unit);
1494                match eval_refinement(&r.predicate, &r.binding, &arg) {
1495                    Ok(true) => {}
1496                    Ok(false) => return Err(VmError::RefinementFailed {
1497                        fn_name: f.name.clone(),
1498                        param_index: i,
1499                        binding: r.binding.clone(),
1500                        reason: format!("predicate failed for {} = {arg:?}", r.binding),
1501                    }),
1502                    Err(reason) => return Err(VmError::RefinementFailed {
1503                        fn_name: f.name.clone(),
1504                        param_index: i,
1505                        binding: r.binding.clone(),
1506                        reason,
1507                    }),
1508                }
1509            }
1510        }
1511        // #465 JIT tier hook at the public entry — same contract as
1512        // the `Op::Call` dispatch arm. Pure-fn memo is not consulted
1513        // at this layer (memo is per-Op::Call); the hook fires
1514        // unconditionally for refinement-clean calls.
1515        if let Some(mut hook) = self.jit_hook.take() {
1516            let hook_result = hook.try_call(fn_id, &args);
1517            self.jit_hook = Some(hook);
1518            if let Some(result) = hook_result? {
1519                return Ok(result);
1520            }
1521        }
1522        let f = &self.program.functions[fn_id as usize];
1523        // Claim slots from the locals stack allocator (#389 slice 3).
1524        let locals_start = self.locals_storage.len();
1525        let locals_len = f.locals_count.max(f.arity) as usize;
1526        self.locals_storage.resize(locals_start + locals_len, Value::Unit);
1527        for (i, v) in args.into_iter().enumerate() {
1528            self.locals_storage[locals_start + i] = v;
1529        }
1530        // Record the depth before pushing — this is what `run` will
1531        // exit at, supporting reentrant invocation from inside the
1532        // VM (e.g. the parser interpreter calling closures, #221).
1533        let base_depth = self.frames.len();
1534        self.push_frame(Frame {
1535            fn_id, pc: 0, locals_start, locals_len,
1536            stack_base: self.stack.len(),
1537            trace_kind: FrameKind::Entry,
1538            memo_key: None,
1539            stack_record_arena_start: self.stack_record_arena.len(),
1540            stack_record_budget_remaining: STACK_RECORD_BUDGET_SLOTS,
1541        })?;
1542        self.run_to(base_depth)
1543    }
1544
1545    /// All call-frame pushes funnel through here so the depth
1546    /// check can't be skipped by a missing branch. Returns
1547    /// `CallStackOverflow` instead of letting recursion blow the
1548    /// host's native stack.
1549    fn push_frame(&mut self, frame: Frame) -> Result<(), VmError> {
1550        if self.frames.len() as u32 >= MAX_CALL_DEPTH {
1551            return Err(VmError::CallStackOverflow(MAX_CALL_DEPTH));
1552        }
1553        self.frames.push(frame);
1554        Ok(())
1555    }
1556
1557    /// Run until the frame stack drops to `base_depth`. Required for
1558    /// reentrant invocation: a `Vm::invoke` call from inside an
1559    /// already-running `run()` must return when *its* frame returns,
1560    /// not when the entire frame stack empties (#221).
1561    fn run_to(&mut self, base_depth: usize) -> Result<Value, VmError> {
1562        // #461 slice A: cache the executing function's code slice across
1563        // ops instead of re-deriving `program.functions[fn_id].code` on
1564        // every iteration. The program is borrowed (`&'a Program`) and is
1565        // never mutated during a run, so the slice reference is valid for
1566        // the whole run and — crucially — is independent of the `&mut self`
1567        // borrow the op handlers take: it points into the caller-owned
1568        // `Program`, not into `*self`. Re-resolve only when `fn_id`
1569        // changes, which is exactly the frame-transition set (Call /
1570        // CallClosure / TailCall / Return); recursion into the same
1571        // `fn_id` correctly keeps the cached slice. `frame_idx` / `fn_id`
1572        // stay recomputed per op (cheap field reads), so the op handlers
1573        // are untouched and their `fn_id` bindings shadow as before.
1574        let program: &'a Program = self.program;
1575        let mut code: &'a [Op] = &[];
1576        let mut code_fn_id: u32 = u32::MAX;
1577        loop {
1578            if self.steps > self.step_limit {
1579                let frame_idx = self.frames.len() - 1;
1580                let fn_id = self.frames[frame_idx].fn_id;
1581                let fn_name = &program.functions[fn_id as usize].name;
1582                return Err(VmError::Panic(format!(
1583                    "step limit exceeded in `{fn_name}` ({} > {})",
1584                    self.steps, self.step_limit,
1585                )));
1586            }
1587            self.steps += 1;
1588            let frame_idx = self.frames.len() - 1;
1589            let pc = self.frames[frame_idx].pc;
1590            let fn_id = self.frames[frame_idx].fn_id;
1591            if fn_id != code_fn_id {
1592                code = &program.functions[fn_id as usize].code;
1593                code_fn_id = fn_id;
1594            }
1595            // #461 slice B: the bytecode verifier (#366) proves pc stays
1596            // in bounds for every reachable op — every path through a
1597            // function ends in Return / Jump / TailCall, so execution
1598            // never falls off the end of `code`. The per-op
1599            // `pc >= code.len()` guard is therefore redundant for verified
1600            // programs; demote it to a debug-only assertion. The `code[pc]`
1601            // index below stays bounds-checked, so a malformed program in
1602            // a release build still panics (loudly, just without the
1603            // bespoke message) rather than reading out of bounds — no
1604            // `unsafe`, no UB, only the cold error-return path leaves the
1605            // hot loop.
1606            debug_assert!(
1607                pc < code.len(),
1608                "ran past end of code in `{}`",
1609                program.functions[fn_id as usize].name,
1610            );
1611            let op = code[pc];
1612            self.frames[frame_idx].pc = pc + 1;
1613
1614            match op {
1615                Op::PushConst(i) => {
1616                    let c = &self.program.constants[i as usize];
1617                    self.stack.push(const_to_value(c));
1618                }
1619                Op::Pop => { self.pop()?; }
1620                Op::Dup => {
1621                    let v = self.peek()?.clone();
1622                    self.stack.push(v);
1623                }
1624                Op::LoadLocal(i) => {
1625                    let base = self.frames[frame_idx].locals_start;
1626                    let v = self.locals_storage[base + i as usize].clone();
1627                    self.stack.push(v);
1628                }
1629                Op::StoreLocal(i) => {
1630                    let v = self.pop()?;
1631                    let base = self.frames[frame_idx].locals_start;
1632                    self.locals_storage[base + i as usize] = v;
1633                }
1634                Op::MakeRecord { shape_idx, field_count } => {
1635                    self.heap_record_allocs += 1;
1636                    let shape = &self.program.record_shapes[shape_idx as usize];
1637                    let n = field_count as usize;
1638                    debug_assert_eq!(shape.len(), n,
1639                        "MakeRecord field_count must match record_shapes[shape_idx].len()");
1640                    let mut values: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1641                    for i in (0..n).rev() {
1642                        values[i] = self.pop()?;
1643                    }
1644                    let mut rec: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
1645                    for (i, val) in values.into_iter().enumerate() {
1646                        let name: SmolStr = match &self.program.constants[shape[i] as usize] {
1647                            Const::FieldName(s) => s.as_str().into(),
1648                            _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
1649                        };
1650                        rec.insert(name, val);
1651                    }
1652                    self.stack.push(Value::Record { shape_id: shape_idx, fields: Box::new(rec) });
1653                }
1654                Op::AllocStackRecord { shape_idx, field_count } => {
1655                    // #464 step 2. Same value-stack contract as
1656                    // MakeRecord (pop `field_count`, push 1), but the
1657                    // fields live in the VM's stack-record arena
1658                    // instead of a heap-allocated IndexMap.
1659                    //
1660                    // Budget check: if this frame's remaining
1661                    // allocation budget can't cover `field_count`
1662                    // slots, fall back to MakeRecord behavior. The
1663                    // observable result is identical (a record
1664                    // value) so the compiler doesn't need to know
1665                    // ahead of time whether the budget will hold.
1666                    let n = field_count as usize;
1667                    let frame = &mut self.frames[frame_idx];
1668                    if frame.stack_record_budget_remaining < field_count as u32 {
1669                        self.stack_record_heap_fallbacks += 1;
1670                        // Heap fallback path — exact copy of
1671                        // MakeRecord's body. Compiler emitted
1672                        // AllocStackRecord because escape analysis
1673                        // proved the record can stay frame-local;
1674                        // the budget exhaustion is a runtime cost
1675                        // ceiling, not a correctness issue.
1676                        let shape = &self.program.record_shapes[shape_idx as usize];
1677                        debug_assert_eq!(shape.len(), n,
1678                            "AllocStackRecord field_count must match record_shapes[shape_idx].len()");
1679                        let mut values: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1680                        for i in (0..n).rev() {
1681                            values[i] = self.pop()?;
1682                        }
1683                        let mut rec: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
1684                        for (i, val) in values.into_iter().enumerate() {
1685                            let name: SmolStr = match &self.program.constants[shape[i] as usize] {
1686                                Const::FieldName(s) => s.as_str().into(),
1687                                _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
1688                            };
1689                            rec.insert(name, val);
1690                        }
1691                        self.stack.push(Value::Record { shape_id: shape_idx, fields: Box::new(rec) });
1692                    } else {
1693                        self.stack_record_allocs += 1;
1694                        // Stack path: append the popped field values
1695                        // to the arena in shape order (matches the
1696                        // IndexMap insertion order used by MakeRecord,
1697                        // so the polymorphic GetField IC sees the same
1698                        // offset for either variant).
1699                        frame.stack_record_budget_remaining -= field_count as u32;
1700                        let slab_start = self.stack_record_arena.len();
1701                        // Reserve all slots upfront so we can write in
1702                        // shape order while popping in reverse —
1703                        // matches MakeRecord's idiom.
1704                        self.stack_record_arena.resize(slab_start + n, Value::Unit);
1705                        for i in (0..n).rev() {
1706                            let v = self.pop()?;
1707                            self.stack_record_arena[slab_start + i] = v;
1708                        }
1709                        self.stack.push(Value::StackRecord {
1710                            shape_id: shape_idx,
1711                            slab_start: slab_start as u32,
1712                            field_count,
1713                        });
1714                    }
1715                }
1716                Op::AllocArenaRecord { shape_idx, field_count } => {
1717                    // #463 slice 2a. Same value-stack contract as
1718                    // MakeRecord, but field values land in the
1719                    // request-scoped `arena_slab` instead of a
1720                    // per-field heap IndexMap. Runtime fallback when
1721                    // no scope is active — the op silently degrades
1722                    // to the MakeRecord heap path so arena-lowered
1723                    // bytecode stays sound in non-handler contexts
1724                    // (REPL, tests, top-level scripts).
1725                    let n = field_count as usize;
1726                    if self.arena_scope_starts.is_empty() {
1727                        self.arena_record_heap_fallbacks += 1;
1728                        // Heap fallback path — exact copy of
1729                        // MakeRecord's body. Same compile-time
1730                        // contract (shape order, IndexMap insertion)
1731                        // so the resulting Value::Record is
1732                        // indistinguishable from a direct MakeRecord.
1733                        let shape = &self.program.record_shapes[shape_idx as usize];
1734                        debug_assert_eq!(shape.len(), n,
1735                            "AllocArenaRecord field_count must match record_shapes[shape_idx].len()");
1736                        let mut values: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1737                        for i in (0..n).rev() {
1738                            values[i] = self.pop()?;
1739                        }
1740                        let mut rec: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
1741                        for (i, val) in values.into_iter().enumerate() {
1742                            let name: SmolStr = match &self.program.constants[shape[i] as usize] {
1743                                Const::FieldName(s) => s.as_str().into(),
1744                                _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
1745                            };
1746                            rec.insert(name, val);
1747                        }
1748                        self.stack.push(Value::Record { shape_id: shape_idx, fields: Box::new(rec) });
1749                    } else {
1750                        self.arena_record_allocs += 1;
1751                        // Arena path: append the popped field values
1752                        // to the slab in shape order (matches
1753                        // MakeRecord's IndexMap insertion order, so
1754                        // the polymorphic GetField IC sees the same
1755                        // offset across all three variants).
1756                        let slab_start = self.arena_slab.len();
1757                        self.arena_slab.resize(slab_start + n, Value::Unit);
1758                        for i in (0..n).rev() {
1759                            let v = self.pop()?;
1760                            self.arena_slab[slab_start + i] = v;
1761                        }
1762                        self.stack.push(Value::ArenaRecord {
1763                            shape_id: shape_idx,
1764                            slab_start: slab_start as u32,
1765                            field_count,
1766                        });
1767                    }
1768                }
1769                Op::MakeTuple(n) => {
1770                    let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1771                    for i in (0..n as usize).rev() { items[i] = self.pop()?; }
1772                    self.stack.push(Value::Tuple(items));
1773                }
1774                Op::AllocStackTuple { arity } => {
1775                    // #464 tuple codegen. Same value-stack contract as
1776                    // MakeTuple (pop `arity`, push 1), but the elements
1777                    // live in the shared stack-record arena instead of
1778                    // a heap Vec. Budget exhaustion falls back to the
1779                    // MakeTuple heap path — identical observable result.
1780                    let n = arity as usize;
1781                    let frame = &mut self.frames[frame_idx];
1782                    if frame.stack_record_budget_remaining < arity as u32 {
1783                        self.stack_record_heap_fallbacks += 1;
1784                        let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1785                        for i in (0..n).rev() { items[i] = self.pop()?; }
1786                        self.stack.push(Value::Tuple(items));
1787                    } else {
1788                        self.stack_record_allocs += 1;
1789                        frame.stack_record_budget_remaining -= arity as u32;
1790                        let slab_start = self.stack_record_arena.len();
1791                        self.stack_record_arena.resize(slab_start + n, Value::Unit);
1792                        for i in (0..n).rev() {
1793                            let v = self.pop()?;
1794                            self.stack_record_arena[slab_start + i] = v;
1795                        }
1796                        self.stack.push(Value::StackTuple {
1797                            slab_start: slab_start as u32,
1798                            arity,
1799                        });
1800                    }
1801                }
1802                Op::AllocArenaTuple { arity } => {
1803                    // #463 slice 2a. Tuple analogue of
1804                    // AllocArenaRecord: arena slab when a scope is
1805                    // active, MakeTuple heap fallback otherwise.
1806                    let n = arity as usize;
1807                    if self.arena_scope_starts.is_empty() {
1808                        self.arena_record_heap_fallbacks += 1;
1809                        let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1810                        for i in (0..n).rev() { items[i] = self.pop()?; }
1811                        self.stack.push(Value::Tuple(items));
1812                    } else {
1813                        self.arena_record_allocs += 1;
1814                        let slab_start = self.arena_slab.len();
1815                        self.arena_slab.resize(slab_start + n, Value::Unit);
1816                        for i in (0..n).rev() {
1817                            let v = self.pop()?;
1818                            self.arena_slab[slab_start + i] = v;
1819                        }
1820                        self.stack.push(Value::ArenaTuple {
1821                            slab_start: slab_start as u32,
1822                            arity,
1823                        });
1824                    }
1825                }
1826                Op::MakeList(n) => {
1827                    let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
1828                    for i in (0..n as usize).rev() { items[i] = self.pop()?; }
1829                    self.stack.push(Value::List(items.into()));
1830                }
1831                Op::MakeVariant { name_idx, arity } => {
1832                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
1833                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
1834                    let name = match &self.program.constants[name_idx as usize] {
1835                        Const::VariantName(s) => s.clone(),
1836                        _ => return Err(VmError::TypeMismatch("expected VariantName const".into())),
1837                    };
1838                    self.stack.push(Value::Variant { name, args });
1839                }
1840                Op::GetField { name_idx, site_idx } => {
1841                    let v = self.pop()?;
1842                    match v {
1843                        Value::Record { fields: r, shape_id } => {
1844                            if ic_stats_enabled() {
1845                                record_ic_hit(fn_id, site_idx, shape_id);
1846                            }
1847                            // Inline cache keyed by (fn_id, site_idx) with
1848                            // shape_id-keyed verification (#462). Slot stores
1849                            // (shape_id_at_install, offset). Hit verification:
1850                            // - real shape_id (!= NO_SHAPE_ID) matches: offset
1851                            //   is guaranteed valid (records with the same
1852                            //   shape_id share the same field-name ordering
1853                            //   from the compile-time `record_shapes` table).
1854                            //   One u32 compare; no string work.
1855                            // - NO_SHAPE_ID matches NO_SHAPE_ID: distinct
1856                            //   dynamic shapes both carry this sentinel and
1857                            //   would otherwise alias, so we fall back to
1858                            //   verifying via name compare against the field
1859                            //   at the cached offset — the pre-slice
1860                            //   correctness path.
1861                            // On any mismatch we walk by name and reinstall
1862                            // (shape_id, offset).
1863                            let fid = fn_id as usize;
1864                            let sid = site_idx as usize;
1865                            if self.field_ics[fid].is_empty() {
1866                                let n = self.program.functions[fid].field_ic_sites as usize;
1867                                self.field_ics[fid] = vec![None; n];
1868                            }
1869                            let cached = self.field_ics[fid][sid];
1870                            let value = 'ic: {
1871                                if let Some((cached_shape, off)) = cached {
1872                                    if cached_shape == shape_id {
1873                                        if shape_id != crate::value::NO_SHAPE_ID {
1874                                            // Real shape match: offset is sound.
1875                                            if let Some((_, val)) = r.get_index(off) {
1876                                                break 'ic val.clone();
1877                                            }
1878                                        } else if let Some((k, val)) = r.get_index(off) {
1879                                            // Dynamic shape: verify by name.
1880                                            if let Const::FieldName(s) =
1881                                                &self.program.constants[name_idx as usize]
1882                                            {
1883                                                if s == k { break 'ic val.clone(); }
1884                                            }
1885                                        }
1886                                    }
1887                                }
1888                                // Cache miss: resolve by name, install
1889                                // (shape_id, offset).
1890                                let name = match &self.program.constants[name_idx as usize] {
1891                                    Const::FieldName(s) => s.as_str(),
1892                                    _ => return Err(VmError::TypeMismatch(
1893                                        "expected FieldName const".into())),
1894                                };
1895                                let (off, _, val) = r.get_full(name)
1896                                    .ok_or_else(|| VmError::TypeMismatch(
1897                                        format!("missing field `{name}`")))?;
1898                                self.field_ics[fid][sid] = Some((shape_id, off));
1899                                val.clone()
1900                            };
1901                            self.stack.push(value);
1902                        }
1903                        Value::StackRecord { shape_id, slab_start, field_count } => {
1904                            // #464 step 2: dispatch over a stack-allocated
1905                            // record. The IC slot stored
1906                            // (shape_id, offset_in_shape) is interoperable
1907                            // with the heap path because MakeRecord builds
1908                            // the IndexMap in shape order — offset N means
1909                            // the same field in either representation. So
1910                            // we share `field_ics` with the heap path; no
1911                            // per-variant cache pollution.
1912                            if ic_stats_enabled() {
1913                                record_ic_hit(fn_id, site_idx, shape_id);
1914                            }
1915                            let fid = fn_id as usize;
1916                            let sid = site_idx as usize;
1917                            if self.field_ics[fid].is_empty() {
1918                                let n = self.program.functions[fid].field_ic_sites as usize;
1919                                self.field_ics[fid] = vec![None; n];
1920                            }
1921                            let cached = self.field_ics[fid][sid];
1922                            let value = 'ic: {
1923                                if let Some((cached_shape, off)) = cached {
1924                                    if cached_shape == shape_id && (off as u16) < field_count {
1925                                        // Shape-keyed verification is sound
1926                                        // here for the same reason as the
1927                                        // heap path — compile-time shape
1928                                        // IDs are issued by
1929                                        // `Program::record_shapes` and
1930                                        // their field order is fixed.
1931                                        // Stack records always carry a
1932                                        // compile-time shape_id (NO_SHAPE_ID
1933                                        // is impossible — AllocStackRecord
1934                                        // is only emitted at compile time
1935                                        // with a known shape_idx).
1936                                        let idx = slab_start as usize + off;
1937                                        break 'ic self.stack_record_arena[idx].clone();
1938                                    }
1939                                }
1940                                // Cache miss: walk the shape's field-name
1941                                // vec to find the slot for `name_idx`. The
1942                                // miss path is O(field_count) like the
1943                                // heap path, but the hot retrieval after
1944                                // install is one array index — cheaper
1945                                // than IndexMap::get_index.
1946                                let shape =
1947                                    &self.program.record_shapes[shape_id as usize];
1948                                let target_name = match &self.program.constants[name_idx as usize] {
1949                                    Const::FieldName(s) => s.as_str(),
1950                                    _ => return Err(VmError::TypeMismatch(
1951                                        "expected FieldName const".into())),
1952                                };
1953                                let mut found: Option<usize> = None;
1954                                for (i, fn_const_idx) in shape.iter().enumerate() {
1955                                    if let Const::FieldName(s) =
1956                                        &self.program.constants[*fn_const_idx as usize]
1957                                    {
1958                                        if s == target_name { found = Some(i); break; }
1959                                    }
1960                                }
1961                                let off = found.ok_or_else(|| VmError::TypeMismatch(
1962                                    format!("missing field `{target_name}` on stack record")))?;
1963                                self.field_ics[fid][sid] = Some((shape_id, off));
1964                                self.stack_record_arena[slab_start as usize + off].clone()
1965                            };
1966                            self.stack.push(value);
1967                        }
1968                        Value::ArenaRecord { shape_id, slab_start, field_count } => {
1969                            // #463 slice 2a: dispatch over an
1970                            // arena-allocated record. Identical IC
1971                            // story to `StackRecord` above — the slot
1972                            // stores `(shape_id, offset)` and offset
1973                            // semantics match `Value::Record`'s
1974                            // IndexMap insertion order, so the IC is
1975                            // three-way interoperable.
1976                            if ic_stats_enabled() {
1977                                record_ic_hit(fn_id, site_idx, shape_id);
1978                            }
1979                            let fid = fn_id as usize;
1980                            let sid = site_idx as usize;
1981                            if self.field_ics[fid].is_empty() {
1982                                let n = self.program.functions[fid].field_ic_sites as usize;
1983                                self.field_ics[fid] = vec![None; n];
1984                            }
1985                            let cached = self.field_ics[fid][sid];
1986                            let value = 'ic: {
1987                                if let Some((cached_shape, off)) = cached {
1988                                    if cached_shape == shape_id && (off as u16) < field_count {
1989                                        let idx = slab_start as usize + off;
1990                                        break 'ic self.arena_slab[idx].clone();
1991                                    }
1992                                }
1993                                let shape =
1994                                    &self.program.record_shapes[shape_id as usize];
1995                                let target_name = match &self.program.constants[name_idx as usize] {
1996                                    Const::FieldName(s) => s.as_str(),
1997                                    _ => return Err(VmError::TypeMismatch(
1998                                        "expected FieldName const".into())),
1999                                };
2000                                let mut found: Option<usize> = None;
2001                                for (i, fn_const_idx) in shape.iter().enumerate() {
2002                                    if let Const::FieldName(s) =
2003                                        &self.program.constants[*fn_const_idx as usize]
2004                                    {
2005                                        if s == target_name { found = Some(i); break; }
2006                                    }
2007                                }
2008                                let off = found.ok_or_else(|| VmError::TypeMismatch(
2009                                    format!("missing field `{target_name}` on arena record")))?;
2010                                self.field_ics[fid][sid] = Some((shape_id, off));
2011                                self.arena_slab[slab_start as usize + off].clone()
2012                            };
2013                            self.stack.push(value);
2014                        }
2015                        other => return Err(VmError::TypeMismatch(
2016                            format!("GetField on non-record: {other:?}"))),
2017                    }
2018                }
2019                Op::GetElem(i) => {
2020                    let v = self.pop()?;
2021                    match v {
2022                        Value::Tuple(items) => {
2023                            let v = items.into_iter().nth(i as usize)
2024                                .ok_or_else(|| VmError::TypeMismatch(format!("tuple index {i} out of range")))?;
2025                            self.stack.push(v);
2026                        }
2027                        // #464 tuple codegen: positional read out of a
2028                        // frame-local tuple. The arena slot is an O(1)
2029                        // index, mirroring the heap path's nth().
2030                        Value::StackTuple { slab_start, arity } => {
2031                            if i >= arity {
2032                                return Err(VmError::TypeMismatch(
2033                                    format!("tuple index {i} out of range")));
2034                            }
2035                            let v = self.stack_record_arena[slab_start as usize + i as usize].clone();
2036                            self.stack.push(v);
2037                        }
2038                        // #463 slice 2a: positional read out of an
2039                        // arena tuple — same O(1) index pattern as
2040                        // StackTuple but through `arena_slab`.
2041                        Value::ArenaTuple { slab_start, arity } => {
2042                            if i >= arity {
2043                                return Err(VmError::TypeMismatch(
2044                                    format!("tuple index {i} out of range")));
2045                            }
2046                            let v = self.arena_slab[slab_start as usize + i as usize].clone();
2047                            self.stack.push(v);
2048                        }
2049                        other => return Err(VmError::TypeMismatch(format!("GetElem on non-tuple: {other:?}"))),
2050                    }
2051                }
2052                Op::TestVariant(i) => {
2053                    let name = match &self.program.constants[i as usize] {
2054                        Const::VariantName(s) => s.clone(),
2055                        _ => return Err(VmError::TypeMismatch("expected VariantName const".into())),
2056                    };
2057                    let v = self.pop()?;
2058                    match &v {
2059                        Value::Variant { name: vname, .. } => {
2060                            self.stack.push(Value::Bool(vname == &name));
2061                        }
2062                        // For tag-only enums of primitive type (e.g. ParseError = Empty | NotNumber)
2063                        // the value is currently a Variant too, since constructors emit MakeVariant.
2064                        other => return Err(VmError::TypeMismatch(format!("TestVariant on non-variant: {other:?}"))),
2065                    }
2066                }
2067                Op::GetVariant(_i) => {
2068                    let v = self.pop()?;
2069                    match v {
2070                        Value::Variant { args, .. } => {
2071                            self.stack.push(Value::Tuple(args));
2072                        }
2073                        other => return Err(VmError::TypeMismatch(format!("GetVariant on non-variant: {other:?}"))),
2074                    }
2075                }
2076                Op::GetVariantArg(i) => {
2077                    let v = self.pop()?;
2078                    match v {
2079                        Value::Variant { mut args, .. } => {
2080                            if (i as usize) >= args.len() {
2081                                return Err(VmError::TypeMismatch("variant arg index oob".into()));
2082                            }
2083                            self.stack.push(args.swap_remove(i as usize));
2084                        }
2085                        other => return Err(VmError::TypeMismatch(format!("GetVariantArg on non-variant: {other:?}"))),
2086                    }
2087                }
2088                Op::GetListLen => {
2089                    let v = self.pop()?;
2090                    match v {
2091                        Value::List(items) => self.stack.push(Value::Int(items.len() as i64)),
2092                        other => return Err(VmError::TypeMismatch(format!("GetListLen on non-list: {other:?}"))),
2093                    }
2094                }
2095                Op::GetListElem(i) => {
2096                    let v = self.pop()?;
2097                    match v {
2098                        Value::List(items) => {
2099                            let v = items.into_iter().nth(i as usize)
2100                                .ok_or_else(|| VmError::TypeMismatch("list index oob".into()))?;
2101                            self.stack.push(v);
2102                        }
2103                        other => return Err(VmError::TypeMismatch(format!("GetListElem on non-list: {other:?}"))),
2104                    }
2105                }
2106                Op::GetListElemDyn => {
2107                    // Stack: [list, idx]
2108                    let idx = match self.pop()? {
2109                        Value::Int(n) => n as usize,
2110                        other => return Err(VmError::TypeMismatch(format!("GetListElemDyn idx: {other:?}"))),
2111                    };
2112                    let v = self.pop()?;
2113                    match v {
2114                        Value::List(items) => {
2115                            let v = items.into_iter().nth(idx)
2116                                .ok_or_else(|| VmError::TypeMismatch("list index oob".into()))?;
2117                            self.stack.push(v);
2118                        }
2119                        other => return Err(VmError::TypeMismatch(format!("GetListElemDyn on non-list: {other:?}"))),
2120                    }
2121                }
2122                Op::ListAppend => {
2123                    let value = self.pop()?;
2124                    let list = self.pop()?;
2125                    match list {
2126                        Value::List(mut items) => {
2127                            items.push_back(value);
2128                            self.stack.push(Value::List(items));
2129                        }
2130                        other => return Err(VmError::TypeMismatch(format!("ListAppend on non-list: {other:?}"))),
2131                    }
2132                }
2133                Op::Jump(off) => {
2134                    let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
2135                    self.frames[frame_idx].pc = new_pc;
2136                }
2137                Op::JumpIf(off) => {
2138                    let v = self.pop()?;
2139                    if v.as_bool() {
2140                        let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
2141                        self.frames[frame_idx].pc = new_pc;
2142                    }
2143                }
2144                Op::JumpIfNot(off) => {
2145                    let v = self.pop()?;
2146                    if !v.as_bool() {
2147                        let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
2148                        self.frames[frame_idx].pc = new_pc;
2149                    }
2150                }
2151                Op::MakeClosure { fn_id, capture_count } => {
2152                    let n = capture_count as usize;
2153                    let mut captures: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
2154                    for i in (0..n).rev() { captures[i] = self.pop()?; }
2155                    // Look up the canonical body hash so the resulting
2156                    // `Value::Closure` carries it for equality (#222).
2157                    let body_hash = self.program.functions[fn_id as usize].body_hash;
2158                    self.stack.push(Value::Closure { fn_id, body_hash, captures });
2159                }
2160                Op::CallClosure { arity, node_id_idx } => {
2161                    let arity = arity as usize;
2162                    // Args sit on the value stack at [args_base..]; the
2163                    // closure sits just below them at args_base - 1. Take
2164                    // the closure out (leaving a Unit placeholder), then
2165                    // write its captures and pop the args directly into
2166                    // the callee's locals — no per-call args Vec and no
2167                    // `captures.extend(args)` realloc (#464). The combined
2168                    // [captures, args] view the tracer wants is exactly
2169                    // the contiguous locals slice we just filled.
2170                    let args_base = self.stack.len() - arity;
2171                    let closure = std::mem::replace(&mut self.stack[args_base - 1], Value::Unit);
2172                    let (fn_id, captures) = match closure {
2173                        Value::Closure { fn_id, captures, .. } => (fn_id, captures),
2174                        other => return Err(VmError::TypeMismatch(format!("CallClosure on non-closure: {other:?}"))),
2175                    };
2176                    let fid = fn_id as usize;
2177                    let node_id = const_str(&self.program.constants, node_id_idx);
2178                    let budget_cost = call_budget_cost(&self.program.functions[fid]);
2179                    if budget_cost > 0 {
2180                        self.handler.note_call_budget(budget_cost)
2181                            .map_err(VmError::Effect)?;
2182                    }
2183                    let cap_n = captures.len();
2184                    let locals_start = self.locals_storage.len();
2185                    let locals_len = self.program.functions[fid].locals_count
2186                        .max(self.program.functions[fid].arity) as usize;
2187                    self.locals_storage.resize(locals_start + locals_len, Value::Unit);
2188                    for (i, v) in captures.into_iter().enumerate() {
2189                        self.locals_storage[locals_start + i] = v;
2190                    }
2191                    // Move the args off the value stack into the locals
2192                    // following the captures (popping leaves the args off
2193                    // the stack; the closure's Unit placeholder is then
2194                    // the top, so truncate it away).
2195                    for i in (0..arity).rev() {
2196                        self.locals_storage[locals_start + cap_n + i] = self.pop()?;
2197                    }
2198                    self.stack.truncate(args_base - 1);
2199                    self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.locals_storage[locals_start..locals_start + cap_n + arity]);
2200                    self.push_frame(Frame {
2201                        fn_id, pc: 0, locals_start, locals_len,
2202                        stack_base: self.stack.len(),
2203                        trace_kind: FrameKind::Call(node_id),
2204                        // Op::CallClosure intentionally doesn't memoize
2205                        // for v1 (#229) — closures over captures need a
2206                        // hashing strategy that includes the captures.
2207                        // Direct Op::Call is the v1 surface.
2208                        memo_key: None,
2209                        stack_record_arena_start: self.stack_record_arena.len(),
2210                        stack_record_budget_remaining: STACK_RECORD_BUDGET_SLOTS,
2211                    })?;
2212                }
2213                Op::SortByKey { node_id_idx: _ } => {
2214                    // #338: pop (xs, f). For each x in xs, invoke
2215                    // f(x) to derive a sortable key. Stable-sort the
2216                    // (key, value) pairs by key. Return the values
2217                    // in sorted order. Keys must be Int / Float /
2218                    // Str; mixed-type pairs and other types compare
2219                    // as equal (preserving original order — stable
2220                    // sort).
2221                    let f = self.pop()?;
2222                    let xs = self.pop()?;
2223                    let items = match xs {
2224                        Value::List(v) => v,
2225                        other => return Err(VmError::TypeMismatch(
2226                            format!("SortByKey requires a List, got: {other:?}"))),
2227                    };
2228                    if !matches!(f, Value::Closure { .. }) {
2229                        return Err(VmError::TypeMismatch(
2230                            format!("SortByKey requires a closure, got: {f:?}")));
2231                    }
2232                    let mut keyed: Vec<(Value, Value)> = Vec::with_capacity(items.len());
2233                    for item in items {
2234                        let key = self.invoke_closure_1(f.clone(), item.clone())?;
2235                        keyed.push((key, item));
2236                    }
2237                    keyed.sort_by(|(ka, _), (kb, _)| compare_sort_keys(ka, kb));
2238                    let sorted: VecDeque<Value> = keyed.into_iter().map(|(_, v)| v).collect();
2239                    self.stack.push(Value::List(sorted));
2240                }
2241                Op::ParallelMap { node_id_idx: _ } => {
2242                    // #305 slice 1: pop (xs, f) and apply f to each
2243                    // element across OS threads.
2244                    //
2245                    // #305 slice 2: each worker now asks the parent
2246                    // handler for a thread-safe per-worker handler via
2247                    // `EffectHandler::spawn_for_worker`. Handlers that
2248                    // opt in (e.g. `DefaultHandler`) yield a fresh
2249                    // instance sharing the budget pool; handlers that
2250                    // don't fall back to the slice-1 behavior of
2251                    // `DenyAllEffects` in the worker.
2252                    let f = self.pop()?;
2253                    let xs = self.pop()?;
2254                    let items = match xs {
2255                        Value::List(v) => v,
2256                        other => return Err(VmError::TypeMismatch(
2257                            format!("ParallelMap requires a List, got: {other:?}"))),
2258                    };
2259                    if !matches!(f, Value::Closure { .. }) {
2260                        return Err(VmError::TypeMismatch(
2261                            format!("ParallelMap requires a closure, got: {f:?}")));
2262                    }
2263                    // Pre-build one handler per worker on the main
2264                    // thread so the worker just owns its handler with
2265                    // no shared borrowing. The actual worker count is
2266                    // capped by `LEX_PAR_MAX_CONCURRENCY` (resolved
2267                    // inside par_map_run); cap ≤ items.len() so we
2268                    // never over-allocate handlers.
2269                    let n_workers = par_max_concurrency().max(1).min(items.len().max(1));
2270                    let mut worker_handlers: Vec<Box<dyn EffectHandler + Send>> =
2271                        Vec::with_capacity(n_workers);
2272                    for _ in 0..n_workers {
2273                        worker_handlers.push(
2274                            self.handler
2275                                .spawn_for_worker()
2276                                .unwrap_or_else(|| Box::new(DenyAllEffects)),
2277                        );
2278                    }
2279                    let results = par_map_run(self.program, f, items.into_iter().collect(), worker_handlers, self.step_limit)?;
2280                    self.stack.push(Value::List(results.into()));
2281                }
2282                Op::ListMap { node_id_idx: _ } => {
2283                    // #464: native map. Owns `xs` (no per-iteration
2284                    // clone of the input or accumulator that the old
2285                    // inlined `LoadLocal`-based loop incurred) and
2286                    // builds the output with one pre-sized allocation.
2287                    let f = self.pop()?;
2288                    let xs = self.pop()?;
2289                    let items = match xs {
2290                        Value::List(v) => v,
2291                        other => return Err(VmError::TypeMismatch(
2292                            format!("ListMap requires a List, got: {other:?}"))),
2293                    };
2294                    if !matches!(f, Value::Closure { .. }) {
2295                        return Err(VmError::TypeMismatch(
2296                            format!("ListMap requires a closure, got: {f:?}")));
2297                    }
2298                    let mut out: VecDeque<Value> = VecDeque::with_capacity(items.len());
2299                    for item in items {
2300                        out.push_back(self.invoke_closure_1(f.clone(), item)?);
2301                    }
2302                    self.stack.push(Value::List(out));
2303                }
2304                Op::ListFilter { node_id_idx: _ } => {
2305                    // #464: native filter. Pred is applied to a clone
2306                    // of each element; the original element is kept on
2307                    // a true result.
2308                    let f = self.pop()?;
2309                    let xs = self.pop()?;
2310                    let items = match xs {
2311                        Value::List(v) => v,
2312                        other => return Err(VmError::TypeMismatch(
2313                            format!("ListFilter requires a List, got: {other:?}"))),
2314                    };
2315                    if !matches!(f, Value::Closure { .. }) {
2316                        return Err(VmError::TypeMismatch(
2317                            format!("ListFilter requires a closure, got: {f:?}")));
2318                    }
2319                    let mut out: VecDeque<Value> = VecDeque::new();
2320                    for item in items {
2321                        let keep = self.invoke_closure_1(f.clone(), item.clone())?;
2322                        if keep.as_bool() {
2323                            out.push_back(item);
2324                        }
2325                    }
2326                    self.stack.push(Value::List(out));
2327                }
2328                Op::ListFold { node_id_idx: _ } => {
2329                    // #464: native left-fold. `acc` is threaded by
2330                    // value; each element is moved into the combiner.
2331                    let f = self.pop()?;
2332                    let init = self.pop()?;
2333                    let xs = self.pop()?;
2334                    let items = match xs {
2335                        Value::List(v) => v,
2336                        other => return Err(VmError::TypeMismatch(
2337                            format!("ListFold requires a List, got: {other:?}"))),
2338                    };
2339                    if !matches!(f, Value::Closure { .. }) {
2340                        return Err(VmError::TypeMismatch(
2341                            format!("ListFold requires a closure, got: {f:?}")));
2342                    }
2343                    let mut acc = init;
2344                    for item in items {
2345                        acc = self.invoke_closure_2(f.clone(), acc, item)?;
2346                    }
2347                    self.stack.push(acc);
2348                }
2349                Op::Call { fn_id, arity, node_id_idx } => {
2350                    let arity = arity as usize;
2351                    let fid = fn_id as usize;
2352                    // Args sit on the value stack at [args_base..]. We
2353                    // read them in place for the refinement / memo /
2354                    // trace checks and only move them into the locals
2355                    // slot-allocator at the very end — avoiding a
2356                    // per-call args Vec (#464 call-overhead). The stack
2357                    // naturally holds the args until consumed, so the
2358                    // only early-exit cleanup is truncating them off on
2359                    // a memo hit; a refinement error aborts the VM.
2360                    let args_base = self.stack.len() - arity;
2361                    let node_id = const_str(&self.program.constants, node_id_idx);
2362                    let budget_cost = call_budget_cost(&self.program.functions[fid]);
2363                    if budget_cost > 0 {
2364                        self.handler.note_call_budget(budget_cost)
2365                            .map_err(VmError::Effect)?;
2366                    }
2367                    // Refinement runtime check (#209 slice 3). Each
2368                    // param's `Option<Refinement>` is evaluated against
2369                    // the actual arg before the frame is pushed. The
2370                    // tracer sees the call enter; failure surfaces as
2371                    // `VmError::RefinementFailed` *before* the body
2372                    // starts, which means an erroring trace shows the
2373                    // call as enter+exit_err with the verdict reason
2374                    // (same shape as `gate.verdict`).
2375                    //
2376                    // Iterate by reference — the loop body reads only
2377                    // through `r` (borrowed from `self.program`) and the
2378                    // arg slots on the stack; we don't mutate `self`, so
2379                    // the borrows are disjoint.
2380                    let refinements = &self.program.functions[fid].refinements;
2381                    for (i, refinement) in refinements.iter().enumerate() {
2382                        if let Some(r) = refinement {
2383                            let arg = self.stack[args_base + i].clone();
2384                            match eval_refinement(&r.predicate, &r.binding, &arg) {
2385                                Ok(true) => { /* satisfied, continue */ }
2386                                Ok(false) => {
2387                                    return Err(VmError::RefinementFailed {
2388                                        fn_name: self.program.functions[fid].name.clone(),
2389                                        param_index: i,
2390                                        binding: r.binding.clone(),
2391                                        reason: format!(
2392                                            "predicate failed for {} = {arg:?}",
2393                                            r.binding),
2394                                    });
2395                                }
2396                                Err(reason) => {
2397                                    return Err(VmError::RefinementFailed {
2398                                        fn_name: self.program.functions[fid].name.clone(),
2399                                        param_index: i,
2400                                        binding: r.binding.clone(),
2401                                        reason,
2402                                    });
2403                                }
2404                            }
2405                        }
2406                    }
2407                    // Pure-fn memoization (#229): if the callee declares
2408                    // no effects, hash the args and consult the cache.
2409                    // On hit, push the cached value, emit synthetic
2410                    // enter+exit trace events (so the trace still shows
2411                    // the call), and skip the frame push entirely.
2412                    //
2413                    // Adaptive gate (#229 adaptive): only hash if this
2414                    // function still has memoization enabled. A pure
2415                    // function whose args never repeat pays the hash for
2416                    // nothing; after a warmup window with zero hits we
2417                    // disable it and its calls take the plain path below.
2418                    let memo_key: Option<(u32, [u8; 16])> =
2419                        if self.program.functions[fid].effects.is_empty()
2420                            && self.memo_fn_state[fid].enabled
2421                            // #621: skip memo if any arg contains a request-scoped
2422                            // arena handle. The memo cache outlives the request arena,
2423                            // so hashing such a handle would dangle.
2424                            && !self.stack[args_base..].iter().any(|v| v.contains_arena_record())
2425                        {
2426                            Some((fn_id, hash_call_args(&self.stack[args_base..])))
2427                        } else {
2428                            if self.program.functions[fid].effects.is_empty() {
2429                                self.pure_memo_skips += 1;
2430                            }
2431                            None
2432                        };
2433                    if let Some(key) = memo_key {
2434                        self.memo_fn_state[fid].calls += 1;
2435                        if let Some(cached) = self.pure_memo.get(&key).cloned() {
2436                            self.memo_fn_state[fid].hits += 1;
2437                            self.pure_memo_hits += 1;
2438                            self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
2439                            self.tracer.exit_ok(&cached);
2440                            self.stack.truncate(args_base);
2441                            self.stack.push(cached);
2442                            continue;
2443                        }
2444                        self.pure_memo_misses += 1;
2445                        // Disable on a cold function: warmup elapsed with
2446                        // no hit. Always safe — the callee is pure, so the
2447                        // plain path recomputes the identical result.
2448                        let st = &mut self.memo_fn_state[fid];
2449                        if st.calls >= MEMO_WARMUP_CALLS && st.hits == 0 {
2450                            st.enabled = false;
2451                        }
2452                    }
2453                    // #465 JIT tier hook. Consulted after refinements +
2454                    // memo. The hook contract (see `crate::jit_hook`)
2455                    // requires the dispatcher to emit the synthetic
2456                    // tracer events itself — we do that on hit, then
2457                    // truncate the args off the stack and push the
2458                    // result, mirroring the memo-hit path above.
2459                    //
2460                    // Take/restore around the call so the hook can
2461                    // borrow `&self.stack` for its args slice while
2462                    // we hold `&mut hook`. Cheaper than cloning the
2463                    // args; the take/put is two pointer writes.
2464                    if let Some(mut hook) = self.jit_hook.take() {
2465                        let hook_result = hook.try_call(fn_id, &self.stack[args_base..]);
2466                        self.jit_hook = Some(hook);
2467                        match hook_result? {
2468                            Some(result) => {
2469                                self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
2470                                self.tracer.exit_ok(&result);
2471                                // Memoize the result if memo is enabled
2472                                // for this fn — same semantics as a
2473                                // regular call's Return path.
2474                                if let Some(key) = memo_key {
2475                                    self.pure_memo.insert(key, result.clone());
2476                                }
2477                                self.stack.truncate(args_base);
2478                                self.stack.push(result);
2479                                continue;
2480                            }
2481                            None => { /* hook declined; fall through */ }
2482                        }
2483                    }
2484                    self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
2485                    let locals_len = self.program.functions[fid].locals_count
2486                        .max(self.program.functions[fid].arity) as usize;
2487                    let locals_start = self.locals_storage.len();
2488                    self.locals_storage.resize(locals_start + locals_len, Value::Unit);
2489                    // Move the args off the stack into the callee's
2490                    // locals (popping leaves the stack at `args_base`).
2491                    for i in (0..arity).rev() {
2492                        self.locals_storage[locals_start + i] = self.pop()?;
2493                    }
2494                    self.push_frame(Frame {
2495                        fn_id, pc: 0, locals_start, locals_len,
2496                        stack_base: self.stack.len(),
2497                        trace_kind: FrameKind::Call(node_id),
2498                        memo_key,
2499                        stack_record_arena_start: self.stack_record_arena.len(),
2500                        stack_record_budget_remaining: STACK_RECORD_BUDGET_SLOTS,
2501                    })?;
2502                }
2503                Op::TailCall { fn_id, arity, node_id_idx } => {
2504                    let arity = arity as usize;
2505                    let fid = fn_id as usize;
2506                    // Args sit on the value stack at [args_base..]. Read
2507                    // them in place for the refinement / trace checks and
2508                    // move them into the reused frame's locals at the end
2509                    // — no per-call args Vec (#464). Tail calls have no
2510                    // memoization, so the consumers are refinement, trace,
2511                    // then the locals move. The args live on `self.stack`
2512                    // while locals live on `self.locals_storage`, so the
2513                    // `truncate(old_locals_start)` below (which releases
2514                    // the *old* frame's locals) doesn't touch them.
2515                    let args_base = self.stack.len() - arity;
2516                    let node_id = const_str(&self.program.constants, node_id_idx);
2517                    let budget_cost = call_budget_cost(&self.program.functions[fid]);
2518                    if budget_cost > 0 {
2519                        self.handler.note_call_budget(budget_cost)
2520                            .map_err(VmError::Effect)?;
2521                    }
2522                    // Refinement runtime check on tail calls too
2523                    // (#209 slice 3). Same shape as Op::Call.
2524                    let refinements = &self.program.functions[fid].refinements;
2525                    for (i, refinement) in refinements.iter().enumerate() {
2526                        if let Some(r) = refinement {
2527                            let arg = self.stack[args_base + i].clone();
2528                            match eval_refinement(&r.predicate, &r.binding, &arg) {
2529                                Ok(true) => {}
2530                                Ok(false) => return Err(VmError::RefinementFailed {
2531                                    fn_name: self.program.functions[fid].name.clone(),
2532                                    param_index: i,
2533                                    binding: r.binding.clone(),
2534                                    reason: format!(
2535                                        "predicate failed for {} = {arg:?}",
2536                                        r.binding),
2537                                }),
2538                                Err(reason) => return Err(VmError::RefinementFailed {
2539                                    fn_name: self.program.functions[fid].name.clone(),
2540                                    param_index: i,
2541                                    binding: r.binding.clone(),
2542                                    reason,
2543                                }),
2544                            }
2545                        }
2546                    }
2547                    // #465 JIT tier hook for tail calls. A tail-called
2548                    // function's result IS the current frame's result,
2549                    // so on a hook hit we collapse the current frame:
2550                    // truncate state back to the frame's entry, emit
2551                    // the synthetic enter+exit_ok trace events that a
2552                    // normal tail-into-return would have produced, then
2553                    // bubble the result up the same way Op::Return
2554                    // does.
2555                    if let Some(mut hook) = self.jit_hook.take() {
2556                        let hook_result = hook.try_call(fn_id, &self.stack[args_base..]);
2557                        self.jit_hook = Some(hook);
2558                        if let Some(result) = hook_result? {
2559                            self.tracer.exit_call_tail();
2560                            self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
2561                            self.tracer.exit_ok(&result);
2562                            let frame = self.frames.pop().unwrap();
2563                            self.stack.truncate(frame.stack_base);
2564                            self.locals_storage.truncate(frame.locals_start);
2565                            self.stack_record_arena.truncate(frame.stack_record_arena_start);
2566                            // Tail calls don't carry a memo_key (the
2567                            // existing arm doesn't memoize them), so
2568                            // skip the memo store the Return path does.
2569                            if self.frames.len() <= base_depth {
2570                                return Ok(result);
2571                            }
2572                            self.stack.push(result);
2573                            continue;
2574                        }
2575                    }
2576                    // A tail call closes the current call's trace frame and
2577                    // opens a new one in its place — preserves the caller's
2578                    // tree depth in the trace.
2579                    self.tracer.exit_call_tail();
2580                    self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
2581                    // Reuse the current frame's locals_start position:
2582                    // truncate to release old locals then extend for the
2583                    // new function (#389 slice 3, same as Op::Return but
2584                    // without popping the frame).
2585                    let old_locals_start = self.frames.last().unwrap().locals_start;
2586                    self.locals_storage.truncate(old_locals_start);
2587                    let new_locals_len = self.program.functions[fid].locals_count
2588                        .max(self.program.functions[fid].arity) as usize;
2589                    self.locals_storage.resize(old_locals_start + new_locals_len, Value::Unit);
2590                    // Move the args off the value stack into the callee's
2591                    // locals (popping leaves the stack at `args_base`).
2592                    for i in (0..arity).rev() {
2593                        self.locals_storage[old_locals_start + i] = self.pop()?;
2594                    }
2595                    // #464 step 2: a tail-called function gets a fresh
2596                    // stack-record arena view. Release any records the
2597                    // pre-tail-call code allocated (they can't be live
2598                    // — the args have already been popped off the
2599                    // value stack) and refill the budget for the
2600                    // callee.
2601                    let arena_start = self.frames.last().unwrap().stack_record_arena_start;
2602                    self.stack_record_arena.truncate(arena_start);
2603                    let frame = self.frames.last_mut().unwrap();
2604                    frame.fn_id = fn_id;
2605                    frame.pc = 0;
2606                    frame.locals_len = new_locals_len;
2607                    frame.trace_kind = FrameKind::Call(node_id);
2608                    frame.stack_record_budget_remaining = STACK_RECORD_BUDGET_SLOTS;
2609                }
2610                Op::EffectCall { kind_idx, op_idx, arity, node_id_idx } => {
2611                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
2612                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
2613                    let kind = match &self.program.constants[kind_idx as usize] {
2614                        Const::Str(s) => s.clone(),
2615                        _ => return Err(VmError::TypeMismatch("expected Str const for effect kind".into())),
2616                    };
2617                    let op_name = match &self.program.constants[op_idx as usize] {
2618                        Const::Str(s) => s.clone(),
2619                        _ => return Err(VmError::TypeMismatch("expected Str const for effect op".into())),
2620                    };
2621                    let node_id = const_str(&self.program.constants, node_id_idx);
2622                    self.tracer.enter_effect(&node_id, &kind, &op_name, &args);
2623                    let result = match self.tracer.override_effect(&node_id) {
2624                        Some(v) => Ok(v),
2625                        // VM-level intercept for `parser.run` (#221).
2626                        // Routed inline rather than through the handler
2627                        // because the parser interpreter needs reentrant
2628                        // VM access to invoke `Value::Closure` values
2629                        // from `Map` / `AndThen` nodes.
2630                        None if (kind.as_str(), op_name.as_str()) == ("parser", "run")
2631                            => self.run_parser_op(args),
2632                        // VM-level intercept for `conc.*` (#381). The actor
2633                        // handler closure must run on the calling VM so it can
2634                        // dispatch arbitrary effects through the same handler
2635                        // chain (e.g. sql queries inside an actor).
2636                        None if kind.as_str() == "conc"
2637                            => self.run_conc_op(op_name.as_str(), args),
2638                        None => self.handler.dispatch(&kind, &op_name, args),
2639                    };
2640                    match result {
2641                        Ok(v) => {
2642                            self.tracer.exit_ok(&v);
2643                            self.stack.push(v);
2644                        }
2645                        Err(e) => {
2646                            self.tracer.exit_err(&e);
2647                            return Err(VmError::Effect(e));
2648                        }
2649                    }
2650                }
2651                Op::Return => {
2652                    let v = self.pop()?;
2653                    let frame = self.frames.pop().unwrap();
2654                    // Trim any extra stuff that the function pushed but didn't pop.
2655                    self.stack.truncate(frame.stack_base);
2656                    // Release this frame's locals back to the arena (#389 slice 3).
2657                    // LIFO frame ordering guarantees this frame's slots are at the top.
2658                    self.locals_storage.truncate(frame.locals_start);
2659                    // #464 step 2: release this frame's stack-record
2660                    // slab. LIFO frame discipline guarantees its
2661                    // records sit at the top of the arena. The
2662                    // returned value `v` is escape-proven not to be
2663                    // one of them — the compiler only emits
2664                    // AllocStackRecord at sites that don't reach
2665                    // `Return`.
2666                    self.stack_record_arena.truncate(frame.stack_record_arena_start);
2667                    if matches!(frame.trace_kind, FrameKind::Call(_)) {
2668                        self.tracer.exit_ok(&v);
2669                    }
2670                    // Pure-fn memoization (#229): if this frame was a
2671                    // memoizable call that missed the cache, write the
2672                    // computed return value back so the next call with
2673                    // the same args returns it without re-executing.
2674                    if let Some(key) = frame.memo_key {
2675                        self.pure_memo.insert(key, v.clone());
2676                    }
2677                    // Exit when we've returned past the depth this
2678                    // `run_to` was entered at — supports reentrancy
2679                    // (a nested `invoke` returns into its caller, not
2680                    // out of the outermost VM run, #221).
2681                    if self.frames.len() <= base_depth {
2682                        return Ok(v);
2683                    }
2684                    self.stack.push(v);
2685                }
2686                Op::Panic(i) => {
2687                    let msg = match &self.program.constants[i as usize] {
2688                        Const::Str(s) => s.clone(),
2689                        _ => "panic".into(),
2690                    };
2691                    return Err(VmError::Panic(msg));
2692                }
2693                // Arithmetic
2694                Op::IntAdd => self.bin_int(|a, b| Value::Int(a + b))?,
2695                Op::IntSub => self.bin_int(|a, b| Value::Int(a - b))?,
2696                Op::IntMul => self.bin_int(|a, b| Value::Int(a * b))?,
2697                Op::IntDiv => self.bin_int_divmod(false)?,
2698                Op::IntMod => self.bin_int_divmod(true)?,
2699                Op::IntNeg => {
2700                    let a = self.pop()?.as_int();
2701                    self.stack.push(Value::Int(-a));
2702                }
2703                Op::IntEq => self.bin_int(|a, b| Value::Bool(a == b))?,
2704                Op::IntLt => self.bin_int(|a, b| Value::Bool(a < b))?,
2705                Op::IntLe => self.bin_int(|a, b| Value::Bool(a <= b))?,
2706                Op::FloatAdd => self.bin_float(|a, b| Value::Float(a + b))?,
2707                Op::FloatSub => self.bin_float(|a, b| Value::Float(a - b))?,
2708                Op::FloatMul => self.bin_float(|a, b| Value::Float(a * b))?,
2709                Op::FloatDiv => self.bin_float(|a, b| Value::Float(a / b))?,
2710                Op::FloatNeg => {
2711                    let a = self.pop()?.as_float();
2712                    self.stack.push(Value::Float(-a));
2713                }
2714                Op::FloatEq => self.bin_float(|a, b| Value::Bool(a == b))?,
2715                Op::FloatLt => self.bin_float(|a, b| Value::Bool(a < b))?,
2716                Op::FloatLe => self.bin_float(|a, b| Value::Bool(a <= b))?,
2717                Op::NumAdd => {
2718                    // #308: `+` is overloaded — Str+Str concatenates,
2719                    // numerics add. Other arithmetic ops (-, *, /, %)
2720                    // still reject Str at the type-checker layer.
2721                    let b = self.pop()?;
2722                    let a = self.pop()?;
2723                    match (a, b) {
2724                        (Value::Int(x), Value::Int(y)) => self.stack.push(Value::Int(x + y)),
2725                        (Value::Float(x), Value::Float(y)) => self.stack.push(Value::Float(x + y)),
2726                        (Value::Str(x), Value::Str(y)) => {
2727                            // SmolStr is immutable; concatenate via a temporary String.
2728                            let mut s = String::with_capacity(x.len() + y.len());
2729                            s.push_str(&x);
2730                            s.push_str(&y);
2731                            self.stack.push(Value::Str(s.into()));
2732                        }
2733                        (a, b) => return Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
2734                    }
2735                }
2736                Op::NumSub => self.bin_num(|a, b| Value::Int(a - b), |a, b| Value::Float(a - b))?,
2737                Op::NumMul => self.bin_num(|a, b| Value::Int(a * b), |a, b| Value::Float(a * b))?,
2738                Op::NumDiv => self.num_divmod(false)?,
2739                Op::NumMod => self.num_divmod(true)?,
2740                Op::NumNeg => {
2741                    let v = self.pop()?;
2742                    match v {
2743                        Value::Int(n) => self.stack.push(Value::Int(-n)),
2744                        Value::Float(f) => self.stack.push(Value::Float(-f)),
2745                        other => return Err(VmError::TypeMismatch(format!("NumNeg on {other:?}"))),
2746                    }
2747                }
2748                Op::NumEq => self.bin_eq()?,
2749                Op::NumLt => self.bin_ord(|a, b| Value::Bool(a < b), |a, b| Value::Bool(a < b), |a, b| Value::Bool(a < b))?,
2750                Op::NumLe => self.bin_ord(|a, b| Value::Bool(a <= b), |a, b| Value::Bool(a <= b), |a, b| Value::Bool(a <= b))?,
2751                Op::BoolAnd => {
2752                    let b = self.pop()?.as_bool();
2753                    let a = self.pop()?.as_bool();
2754                    self.stack.push(Value::Bool(a && b));
2755                }
2756                Op::BoolOr => {
2757                    let b = self.pop()?.as_bool();
2758                    let a = self.pop()?.as_bool();
2759                    self.stack.push(Value::Bool(a || b));
2760                }
2761                Op::BoolNot => {
2762                    let a = self.pop()?.as_bool();
2763                    self.stack.push(Value::Bool(!a));
2764                }
2765                Op::StrConcat => {
2766                    let b = self.pop()?;
2767                    let a = self.pop()?;
2768                    let s = format!("{}{}", a.as_str(), b.as_str());
2769                    self.stack.push(Value::Str(s.into()));
2770                }
2771                Op::StrLen => {
2772                    let v = self.pop()?;
2773                    self.stack.push(Value::Int(v.as_str().len() as i64));
2774                }
2775                Op::StrEq => {
2776                    let b = self.pop()?;
2777                    let a = self.pop()?;
2778                    self.stack.push(Value::Bool(a.as_str() == b.as_str()));
2779                }
2780                Op::BytesLen => {
2781                    let v = self.pop()?;
2782                    match v {
2783                        Value::Bytes(b) => self.stack.push(Value::Int(b.len() as i64)),
2784                        other => return Err(VmError::TypeMismatch(format!("BytesLen on {other:?}"))),
2785                    }
2786                }
2787                Op::BytesEq => {
2788                    let b = self.pop()?;
2789                    let a = self.pop()?;
2790                    let eq = match (a, b) {
2791                        (Value::Bytes(x), Value::Bytes(y)) => x == y,
2792                        _ => return Err(VmError::TypeMismatch("BytesEq operands".into())),
2793                    };
2794                    self.stack.push(Value::Bool(eq));
2795                }
2796
2797                // Superinstructions (#461).
2798                Op::LoadLocalAddIntConst { local_idx, imm_const_idx } => {
2799                    let base = self.frames[frame_idx].locals_start;
2800                    let a = self.locals_storage[base + local_idx as usize].as_int();
2801                    let b = match &self.program.constants[imm_const_idx as usize] {
2802                        Const::Int(n) => *n,
2803                        c => return Err(VmError::TypeMismatch(
2804                            format!("LoadLocalAddIntConst expected Int const, got {c:?}"))),
2805                    };
2806                    self.stack.push(Value::Int(a + b));
2807                    // Override the default `pc + 1`: skip past the
2808                    // two inert primitive ops (the original
2809                    // PushConst + IntAdd) that the peephole pass
2810                    // left in place for body-hash stability.
2811                    self.frames[frame_idx].pc = pc + 3;
2812                }
2813                Op::LoadLocalAddLocal { lhs_idx, rhs_idx } => {
2814                    let base = self.frames[frame_idx].locals_start;
2815                    let a = self.locals_storage[base + lhs_idx as usize].as_int();
2816                    let b = self.locals_storage[base + rhs_idx as usize].as_int();
2817                    self.stack.push(Value::Int(a + b));
2818                    // Override the default `pc + 1`: skip past the
2819                    // two inert primitive ops (the original
2820                    // LoadLocal(rhs_idx) + IntAdd) that the peephole
2821                    // pass left in place for body-hash stability.
2822                    self.frames[frame_idx].pc = pc + 3;
2823                }
2824                Op::LoadLocalSubLocal { lhs_idx, rhs_idx } => {
2825                    let base = self.frames[frame_idx].locals_start;
2826                    let a = self.locals_storage[base + lhs_idx as usize].as_int();
2827                    let b = self.locals_storage[base + rhs_idx as usize].as_int();
2828                    self.stack.push(Value::Int(a - b));
2829                    self.frames[frame_idx].pc = pc + 3;
2830                }
2831                Op::LoadLocalMulLocal { lhs_idx, rhs_idx } => {
2832                    let base = self.frames[frame_idx].locals_start;
2833                    let a = self.locals_storage[base + lhs_idx as usize].as_int();
2834                    let b = self.locals_storage[base + rhs_idx as usize].as_int();
2835                    self.stack.push(Value::Int(a * b));
2836                    self.frames[frame_idx].pc = pc + 3;
2837                }
2838                Op::LoadLocalGetField { local_idx, name_idx, site_idx } => {
2839                    // #461 slice 9: fused `LoadLocal + GetField`. Reads
2840                    // the field directly out of the local record by
2841                    // reference and pushes it, advancing pc by 2 (one
2842                    // tombstone — the original GetField). Avoids the
2843                    // unfused pair's whole-record clone onto the value
2844                    // stack: the dominant heap-record churn on the
2845                    // `response_build` profile (`r.total` field reads).
2846                    let base = self.frames[frame_idx].locals_start;
2847                    let v = self.read_local_record_field(
2848                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetField")?;
2849                    self.stack.push(v);
2850                    self.frames[frame_idx].pc = pc + 2;
2851                }
2852                Op::LoadLocalGetFieldAdd { local_idx, name_idx, site_idx } => {
2853                    // #461 slice 7: fused `LoadLocal + GetField + IntAdd`.
2854                    // Pop the prior stack top (the accumulator), read the
2855                    // field by reference (shared IC via
2856                    // `read_local_record_field`), push the sum, advance
2857                    // pc by 3 (skip the GetField and IntAdd tombstones).
2858                    let acc = self.pop()?.as_int();
2859                    let base = self.frames[frame_idx].locals_start;
2860                    let b = self.read_local_record_field(
2861                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetFieldAdd")?.as_int();
2862                    self.stack.push(Value::Int(acc + b));
2863                    self.frames[frame_idx].pc = pc + 3;
2864                }
2865                Op::LoadLocalGetFieldSub { local_idx, name_idx, site_idx } => {
2866                    // #461 slice 8: `LoadLocal + GetField + IntSub`. The
2867                    // `acc - r.field` idiom. IntSub computes
2868                    // deeper-minus-top; the field was on top in the
2869                    // unfused form, so the result is `acc - field`.
2870                    let acc = self.pop()?.as_int();
2871                    let base = self.frames[frame_idx].locals_start;
2872                    let b = self.read_local_record_field(
2873                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetFieldSub")?.as_int();
2874                    self.stack.push(Value::Int(acc - b));
2875                    self.frames[frame_idx].pc = pc + 3;
2876                }
2877                Op::LoadLocalGetFieldMul { local_idx, name_idx, site_idx } => {
2878                    // #461 slice 8: `LoadLocal + GetField + IntMul`. The
2879                    // `acc * r.field` idiom (mul is commutative, so
2880                    // operand order doesn't matter).
2881                    let acc = self.pop()?.as_int();
2882                    let base = self.frames[frame_idx].locals_start;
2883                    let b = self.read_local_record_field(
2884                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetFieldMul")?.as_int();
2885                    self.stack.push(Value::Int(acc * b));
2886                    self.frames[frame_idx].pc = pc + 3;
2887                }
2888                Op::LoadLocalEqIntConstJumpIfNot { local_idx, imm_const_idx, jump_offset } => {
2889                    // First jump-aware fusion (#461 slice 5). The
2890                    // JumpIfNot's offset is relative to its own
2891                    // pc + 1 = (pc + 3) + 1 = pc + 4, so the branch
2892                    // target is `pc + 4 + jump_offset`. Fall-through
2893                    // (equal → JumpIfNot doesn't jump) is `pc + 4`
2894                    // (skip past the 3 tombstones — PushConst +
2895                    // IntEq + JumpIfNot).
2896                    let base = self.frames[frame_idx].locals_start;
2897                    let a = self.locals_storage[base + local_idx as usize].as_int();
2898                    let b = match &self.program.constants[imm_const_idx as usize] {
2899                        Const::Int(n) => *n,
2900                        _ => return Err(VmError::TypeMismatch(
2901                            "LoadLocalEqIntConstJumpIfNot expects Const::Int".into())),
2902                    };
2903                    let next_pc = if a == b {
2904                        pc + 4
2905                    } else {
2906                        ((pc as i32 + 4) + jump_offset) as usize
2907                    };
2908                    self.frames[frame_idx].pc = next_pc;
2909                }
2910                Op::LoadLocalStoreEqIntConstJumpIfNot { src, dst, imm_const_idx, jump_offset } => {
2911                    // Slice 6: absorbs LoadLocal + StoreLocal + slice-5 op.
2912                    // 6-slot window total (this op + 5 tombstones); fall-
2913                    // through is `pc + 6`, branch target is `pc + 6 +
2914                    // jump_offset` (the original JumpIfNot was at slot
2915                    // pc+5, with offset relative to its own pc+1 = pc+6).
2916                    let base = self.frames[frame_idx].locals_start;
2917                    let a = self.locals_storage[base + src as usize].as_int();
2918                    // Mirror the original `StoreLocal(dst)` — later
2919                    // arm tests in the same `match` expect to find
2920                    // the scrutinee at `locals[dst]`.
2921                    self.locals_storage[base + dst as usize] = Value::Int(a);
2922                    let b = match &self.program.constants[imm_const_idx as usize] {
2923                        Const::Int(n) => *n,
2924                        _ => return Err(VmError::TypeMismatch(
2925                            "LoadLocalStoreEqIntConstJumpIfNot expects Const::Int".into())),
2926                    };
2927                    let next_pc = if a == b {
2928                        pc + 6
2929                    } else {
2930                        ((pc as i32 + 6) + jump_offset) as usize
2931                    };
2932                    self.frames[frame_idx].pc = next_pc;
2933                }
2934                Op::LoadLocalAddIntConstStoreLocal { src, imm_const_idx, dest } => {
2935                    let base = self.frames[frame_idx].locals_start;
2936                    let a = self.locals_storage[base + src as usize].as_int();
2937                    let b = match &self.program.constants[imm_const_idx as usize] {
2938                        Const::Int(n) => *n,
2939                        c => return Err(VmError::TypeMismatch(
2940                            format!("LoadLocalAddIntConstStoreLocal expected Int const, got {c:?}"))),
2941                    };
2942                    self.locals_storage[base + dest as usize] = Value::Int(a + b);
2943                    // Skip past the 3 inert primitive ops we
2944                    // absorbed (original PushConst + IntAdd +
2945                    // StoreLocal).
2946                    self.frames[frame_idx].pc = pc + 4;
2947                }
2948            }
2949        }
2950    }
2951
2952    fn pop(&mut self) -> Result<Value, VmError> {
2953        self.stack.pop().ok_or(VmError::StackUnderflow)
2954    }
2955    fn peek(&self) -> Result<&Value, VmError> {
2956        self.stack.last().ok_or(VmError::StackUnderflow)
2957    }
2958
2959    /// IC-cached field read of `locals[local_idx]`, shared by the
2960    /// field-read fusions: slice 9's `LoadLocalGetField` and slice
2961    /// 7/8's `LoadLocalGetField{Add,Sub,Mul}`. Uses the same
2962    /// `(fn_id, site_idx)` inline-cache slot as the unfused
2963    /// `Op::GetField`, so the paths stay cache-consistent.
2964    /// `op_name` only appears in the non-record error message.
2965    ///
2966    /// Reads the record **by reference** and clones out only the
2967    /// selected field — it does *not* clone the whole record. The
2968    /// unfused `[LoadLocal, GetField]` pair clones the entire record
2969    /// (`Box<IndexMap>` for a heap record) onto the value stack just
2970    /// to read one field and drop the rest; on the `response_build`
2971    /// profile that whole-record clone+drop of the returned `Response`
2972    /// dominated the malloc traffic. Borrowing in place removes it.
2973    ///
2974    /// Borrow discipline: the inline-cache slot can't be written while
2975    /// the record (a borrow of `self.locals_storage`) is live, so the
2976    /// match yields `(value, install)` and the `field_ics` write
2977    /// happens after the borrow ends.
2978    ///
2979    /// `#[inline(always)]`: hot dispatch path, called from four tight
2980    /// `run_to` arms; leaving it out-of-line showed up as a standalone
2981    /// call frame on the profile.
2982    #[inline(always)]
2983    fn read_local_record_field(
2984        &mut self,
2985        base: usize,
2986        local_idx: u16,
2987        fn_id: u32,
2988        name_idx: u32,
2989        site_idx: u32,
2990        op_name: &str,
2991    ) -> Result<Value, VmError> {
2992        let fid = fn_id as usize;
2993        let sid = site_idx as usize;
2994        if self.field_ics[fid].is_empty() {
2995            let n = self.program.functions[fid].field_ic_sites as usize;
2996            self.field_ics[fid] = vec![None; n];
2997        }
2998        let cached = self.field_ics[fid][sid];
2999        let li = base + local_idx as usize;
3000
3001        let (value, install): (Value, Option<(u32, usize)>) =
3002            match &self.locals_storage[li] {
3003                Value::Record { fields: r, shape_id } => {
3004                    let shape_id = *shape_id;
3005                    if ic_stats_enabled() {
3006                        record_ic_hit(fn_id, site_idx, shape_id);
3007                    }
3008                    let hit = if let Some((cached_shape, off)) = cached {
3009                        if cached_shape == shape_id {
3010                            if shape_id != crate::value::NO_SHAPE_ID {
3011                                r.get_index(off).map(|(_, val)| val.clone())
3012                            } else if let Some((k, val)) = r.get_index(off) {
3013                                match &self.program.constants[name_idx as usize] {
3014                                    Const::FieldName(s) if s == k => Some(val.clone()),
3015                                    _ => None,
3016                                }
3017                            } else { None }
3018                        } else { None }
3019                    } else { None };
3020                    match hit {
3021                        Some(v) => (v, None),
3022                        None => {
3023                            let name = match &self.program.constants[name_idx as usize] {
3024                                Const::FieldName(s) => s.as_str(),
3025                                _ => return Err(VmError::TypeMismatch(
3026                                    "expected FieldName const".into())),
3027                            };
3028                            let (off, _, val) = r.get_full(name)
3029                                .ok_or_else(|| VmError::TypeMismatch(
3030                                    format!("missing field `{name}`")))?;
3031                            (val.clone(), Some((shape_id, off)))
3032                        }
3033                    }
3034                }
3035                &Value::StackRecord { shape_id, slab_start, field_count } => {
3036                    if ic_stats_enabled() {
3037                        record_ic_hit(fn_id, site_idx, shape_id);
3038                    }
3039                    if let Some((cached_shape, off)) = cached {
3040                        if cached_shape == shape_id && (off as u16) < field_count {
3041                            let idx = slab_start as usize + off;
3042                            (self.stack_record_arena[idx].clone(), None)
3043                        } else {
3044                            let off = self.resolve_stack_field(shape_id, name_idx)?;
3045                            (self.stack_record_arena[slab_start as usize + off].clone(),
3046                             Some((shape_id, off)))
3047                        }
3048                    } else {
3049                        let off = self.resolve_stack_field(shape_id, name_idx)?;
3050                        (self.stack_record_arena[slab_start as usize + off].clone(),
3051                         Some((shape_id, off)))
3052                    }
3053                }
3054                // #463 slice 2a: superinstruction read out of an
3055                // arena-allocated record held in a local. Same shape
3056                // resolution as the stack-record arm (records share
3057                // the same `record_shapes` table regardless of
3058                // allocation site); only the slab indexed differs.
3059                &Value::ArenaRecord { shape_id, slab_start, field_count } => {
3060                    if ic_stats_enabled() {
3061                        record_ic_hit(fn_id, site_idx, shape_id);
3062                    }
3063                    if let Some((cached_shape, off)) = cached {
3064                        if cached_shape == shape_id && (off as u16) < field_count {
3065                            let idx = slab_start as usize + off;
3066                            (self.arena_slab[idx].clone(), None)
3067                        } else {
3068                            let off = self.resolve_stack_field(shape_id, name_idx)?;
3069                            (self.arena_slab[slab_start as usize + off].clone(),
3070                             Some((shape_id, off)))
3071                        }
3072                    } else {
3073                        let off = self.resolve_stack_field(shape_id, name_idx)?;
3074                        (self.arena_slab[slab_start as usize + off].clone(),
3075                         Some((shape_id, off)))
3076                    }
3077                }
3078                other => return Err(VmError::TypeMismatch(
3079                    format!("{op_name} on non-record: {other:?}"))),
3080            };
3081        if let Some(entry) = install {
3082            self.field_ics[fid][sid] = Some(entry);
3083        }
3084        Ok(value)
3085    }
3086
3087    /// Resolve a field offset within a stack-record shape by name
3088    /// (the slow path when the inline cache misses). Factored out so
3089    /// `read_local_record_field` doesn't hold the `locals_storage`
3090    /// borrow across the `record_shapes` / `constants` walk.
3091    #[inline]
3092    fn resolve_stack_field(&self, shape_id: u32, name_idx: u32) -> Result<usize, VmError> {
3093        let shape = &self.program.record_shapes[shape_id as usize];
3094        let target_name = match &self.program.constants[name_idx as usize] {
3095            Const::FieldName(s) => s.as_str(),
3096            _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
3097        };
3098        for (i, fn_const_idx) in shape.iter().enumerate() {
3099            if let Const::FieldName(s) = &self.program.constants[*fn_const_idx as usize] {
3100                if s == target_name { return Ok(i); }
3101            }
3102        }
3103        Err(VmError::TypeMismatch(
3104            format!("missing field `{target_name}` on stack record")))
3105    }
3106
3107    fn bin_int(&mut self, f: impl Fn(i64, i64) -> Value) -> Result<(), VmError> {
3108        let b = self.pop()?.as_int();
3109        let a = self.pop()?.as_int();
3110        self.stack.push(f(a, b));
3111        Ok(())
3112    }
3113    /// Guarded integer `/` (`is_mod == false`) or `%` (`is_mod == true`)
3114    /// for `Op::IntDiv` / `Op::IntMod` (#696). A zero divisor raises
3115    /// `VmError::DivByZero` instead of panicking the host. `wrapping_*`
3116    /// also tames the only other panicking input, `i64::MIN / -1` (and
3117    /// `i64::MIN % -1`), whose true result overflows `i64`: division
3118    /// wraps to `i64::MIN`, modulo to `0`.
3119    fn bin_int_divmod(&mut self, is_mod: bool) -> Result<(), VmError> {
3120        let b = self.pop()?.as_int();
3121        let a = self.pop()?.as_int();
3122        if b == 0 {
3123            return Err(VmError::DivByZero { op: if is_mod { "modulo" } else { "division" } });
3124        }
3125        let v = if is_mod { a.wrapping_rem(b) } else { a.wrapping_div(b) };
3126        self.stack.push(Value::Int(v));
3127        Ok(())
3128    }
3129    /// Guarded `/` / `%` for the overloaded `Op::NumDiv` / `Op::NumMod`,
3130    /// which accept either both-`Int` or both-`Float` operands (#696).
3131    /// Integers route through the same zero/overflow guards as
3132    /// `bin_int_divmod`; floats keep IEEE-754 semantics (inf/NaN, no
3133    /// trap). Mirrors the type checker, which only admits these two
3134    /// operand shapes for `%` (int) and `/` (int or float).
3135    fn num_divmod(&mut self, is_mod: bool) -> Result<(), VmError> {
3136        let b = self.pop()?;
3137        let a = self.pop()?;
3138        match (a, b) {
3139            (Value::Int(x), Value::Int(y)) => {
3140                if y == 0 {
3141                    return Err(VmError::DivByZero { op: if is_mod { "modulo" } else { "division" } });
3142                }
3143                let v = if is_mod { x.wrapping_rem(y) } else { x.wrapping_div(y) };
3144                self.stack.push(Value::Int(v));
3145                Ok(())
3146            }
3147            (Value::Float(x), Value::Float(y)) => {
3148                self.stack.push(Value::Float(if is_mod { x % y } else { x / y }));
3149                Ok(())
3150            }
3151            (a, b) => Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
3152        }
3153    }
3154    fn bin_float(&mut self, f: impl Fn(f64, f64) -> Value) -> Result<(), VmError> {
3155        let b = self.pop()?.as_float();
3156        let a = self.pop()?.as_float();
3157        self.stack.push(f(a, b));
3158        Ok(())
3159    }
3160    fn bin_num(
3161        &mut self,
3162        i: impl Fn(i64, i64) -> Value,
3163        f: impl Fn(f64, f64) -> Value,
3164    ) -> Result<(), VmError> {
3165        let b = self.pop()?;
3166        let a = self.pop()?;
3167        match (a, b) {
3168            (Value::Int(x), Value::Int(y)) => { self.stack.push(i(x, y)); Ok(()) }
3169            (Value::Float(x), Value::Float(y)) => { self.stack.push(f(x, y)); Ok(()) }
3170            (a, b) => Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
3171        }
3172    }
3173
3174    /// Like `bin_num` but also handles `Str` operands via lexicographic order.
3175    /// Used by `NumLt` / `NumLe` because the type checker admits `Str < Str`
3176    /// and `>` / `>=` compile as swap+NumLt / swap+NumLe (#332).
3177    fn bin_ord(
3178        &mut self,
3179        i: impl Fn(i64, i64) -> Value,
3180        f: impl Fn(f64, f64) -> Value,
3181        s: impl Fn(&str, &str) -> Value,
3182    ) -> Result<(), VmError> {
3183        let b = self.pop()?;
3184        let a = self.pop()?;
3185        match (a, b) {
3186            (Value::Int(x), Value::Int(y)) => { self.stack.push(i(x, y)); Ok(()) }
3187            (Value::Float(x), Value::Float(y)) => { self.stack.push(f(x, y)); Ok(()) }
3188            (Value::Str(x), Value::Str(y)) => { self.stack.push(s(&x, &y)); Ok(()) }
3189            (a, b) => Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
3190        }
3191    }
3192    fn bin_eq(&mut self) -> Result<(), VmError> {
3193        let b = self.pop()?;
3194        let a = self.pop()?;
3195        self.stack.push(Value::Bool(a == b));
3196        Ok(())
3197    }
3198}
3199
3200impl Drop for Vm<'_> {
3201    fn drop(&mut self) {
3202        if ic_stats_enabled() {
3203            dump_ic_stats();
3204        }
3205    }
3206}
3207
3208/// Construct a `Value::Variant` with the given name and args.
3209/// Used by `conc.*` registry ops to return `Result`/`Option`/`ConcError`
3210/// values without hand-writing the struct literal at every site.
3211fn variant(name: &str, args: Vec<Value>) -> Value {
3212    Value::Variant { name: name.to_string(), args }
3213}
3214fn variant_ok(payload: Value) -> Value { variant("Ok", vec![payload]) }
3215fn variant_err(payload: Value) -> Value { variant("Err", vec![payload]) }
3216
3217fn const_to_value(c: &Const) -> Value {
3218    match c {
3219        Const::Int(n) => Value::Int(*n),
3220        Const::Float(f) => Value::Float(*f),
3221        Const::Bool(b) => Value::Bool(*b),
3222        Const::Str(s) => Value::Str(s.as_str().into()),
3223        Const::Bytes(b) => Value::Bytes(b.clone()),
3224        Const::Unit => Value::Unit,
3225        Const::FieldName(s) | Const::VariantName(s) | Const::NodeId(s) => Value::Str(s.as_str().into()),
3226    }
3227}
3228
3229#[cfg(test)]
3230mod memo_hash_tests {
3231    //! #461 follow-up: invariants for the structural memo-key hash
3232    //! that replaced the SHA-256-over-canonical-JSON path. The memo
3233    //! cache keys on this digest with no equality fallback, so the
3234    //! load-bearing property is "equal-under-PartialEq args produce
3235    //! an equal key" — plus enough discrimination that distinct args
3236    //! don't collide in practice.
3237    use super::*;
3238    use indexmap::IndexMap;
3239
3240    fn rec(pairs: &[(&str, Value)]) -> Value {
3241        let mut m: IndexMap<SmolStr, Value> = IndexMap::new();
3242        for (k, v) in pairs { m.insert((*k).into(), v.clone()); }
3243        Value::Record { shape_id: crate::value::NO_SHAPE_ID, fields: Box::new(m) }
3244    }
3245
3246    #[test]
3247    fn identical_args_hash_equal() {
3248        let a = vec![Value::Int(7), Value::Str("hi".into())];
3249        let b = vec![Value::Int(7), Value::Str("hi".into())];
3250        assert_eq!(hash_call_args(&a), hash_call_args(&b));
3251    }
3252
3253    #[test]
3254    fn distinct_scalars_differ() {
3255        assert_ne!(hash_call_args(&[Value::Int(7)]), hash_call_args(&[Value::Int(8)]));
3256        assert_ne!(hash_call_args(&[Value::Int(0)]), hash_call_args(&[Value::Bool(false)]));
3257        assert_ne!(hash_call_args(&[Value::Int(0)]), hash_call_args(&[Value::Unit]));
3258        assert_ne!(hash_call_args(&[Value::Bool(true)]), hash_call_args(&[Value::Bool(false)]));
3259    }
3260
3261    #[test]
3262    fn arity_is_part_of_the_key() {
3263        assert_ne!(
3264            hash_call_args(&[Value::Int(1), Value::Int(2)]),
3265            hash_call_args(&[Value::Int(1)]),
3266        );
3267        // A 2-arg call vs a single Tuple arg of the same elements
3268        // must not collide.
3269        assert_ne!(
3270            hash_call_args(&[Value::Int(1), Value::Int(2)]),
3271            hash_call_args(&[Value::Tuple(vec![Value::Int(1), Value::Int(2)])]),
3272        );
3273    }
3274
3275    #[test]
3276    fn record_hash_is_field_order_independent() {
3277        // IndexMap equality ignores insertion order, so the key must
3278        // too — otherwise equal records would miss the cache.
3279        let r1 = rec(&[("a", Value::Int(1)), ("b", Value::Int(2))]);
3280        let r2 = rec(&[("b", Value::Int(2)), ("a", Value::Int(1))]);
3281        assert_eq!(r1, r2, "precondition: records compare equal");
3282        assert_eq!(hash_call_args(&[r1]), hash_call_args(&[r2]));
3283    }
3284
3285    #[test]
3286    fn record_distinguishes_values_and_keys() {
3287        let base = rec(&[("a", Value::Int(1)), ("b", Value::Int(2))]);
3288        let diff_val = rec(&[("a", Value::Int(1)), ("b", Value::Int(3))]);
3289        let diff_key = rec(&[("a", Value::Int(1)), ("c", Value::Int(2))]);
3290        assert_ne!(hash_call_args(std::slice::from_ref(&base)), hash_call_args(&[diff_val]));
3291        assert_ne!(hash_call_args(&[base]), hash_call_args(&[diff_key]));
3292    }
3293
3294    #[test]
3295    fn shape_id_does_not_affect_record_key() {
3296        // PartialEq ignores shape_id; the key must too.
3297        let mut m: IndexMap<SmolStr, Value> = IndexMap::new();
3298        m.insert("a".into(), Value::Int(1));
3299        let r_no_shape = Value::Record { shape_id: crate::value::NO_SHAPE_ID, fields: Box::new(m.clone()) };
3300        let r_shaped = Value::Record { shape_id: 3, fields: Box::new(m) };
3301        assert_eq!(r_no_shape, r_shaped);
3302        assert_eq!(hash_call_args(&[r_no_shape]), hash_call_args(&[r_shaped]));
3303    }
3304
3305    #[test]
3306    fn variant_name_and_args_matter() {
3307        let some1 = Value::Variant { name: "Some".into(), args: vec![Value::Int(1)] };
3308        let some1b = Value::Variant { name: "Some".into(), args: vec![Value::Int(1)] };
3309        let some2 = Value::Variant { name: "Some".into(), args: vec![Value::Int(2)] };
3310        let none = Value::Variant { name: "None".into(), args: vec![] };
3311        assert_eq!(hash_call_args(std::slice::from_ref(&some1)), hash_call_args(&[some1b]));
3312        assert_ne!(hash_call_args(std::slice::from_ref(&some1)), hash_call_args(&[some2]));
3313        assert_ne!(hash_call_args(&[some1]), hash_call_args(&[none]));
3314    }
3315
3316    #[test]
3317    fn float_bit_pattern_keys() {
3318        assert_eq!(hash_call_args(&[Value::Float(1.5)]), hash_call_args(&[Value::Float(1.5)]));
3319        assert_ne!(hash_call_args(&[Value::Float(1.5)]), hash_call_args(&[Value::Float(2.5)]));
3320        // Same NaN bit pattern → same key (harmless: pure callee is
3321        // deterministic on bit-identical args).
3322        let nan = f64::NAN;
3323        assert_eq!(hash_call_args(&[Value::Float(nan)]), hash_call_args(&[Value::Float(nan)]));
3324    }
3325}