Skip to main content

harn_vm/vm/
execution.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef, Op};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13    Scope,
14    InterruptHandler,
15}
16
17impl Vm {
18    /// Returns true when no scope-level async machinery is armed. The hot
19    /// interpreter loop uses this to skip both the `pending_scope_interrupt`
20    /// future and the `execute_op_with_scope_interrupts` `tokio::select!`
21    /// wrapper on every dispatch — both are necessary for cancellable /
22    /// deadlined VMs but pure overhead in the common case (benchmarks,
23    /// background script execution, etc.).
24    #[inline]
25    pub(crate) fn scope_interrupts_clean(&self) -> bool {
26        self.cancel_token.is_none()
27            && self.interrupt_signal_token.is_none()
28            && self.pending_interrupt_signal.is_none()
29            && self.interrupt_handler_deadline.is_none()
30            && self.deadlines.is_empty()
31    }
32
33    /// Execute a compiled chunk.
34    pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
35        let registry = self.pool_registry.clone();
36        crate::stdlib::pool::with_pool_registry_scope(registry, async {
37            self.execute_scoped(chunk).await
38        })
39        .await
40    }
41
42    async fn execute_scoped(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
43        let _execution_activity = self
44            .wait_for_graph
45            .register_task(self.runtime_context.task_id.clone());
46        let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
47        let result = self.run_chunk(chunk).await;
48        let result = match result {
49            Ok(value) => self.run_pipeline_finish_lifecycle(value).await,
50            Err(error) => {
51                crate::orchestration::clear_pipeline_on_finish();
52                Err(error)
53            }
54        };
55        crate::tracing::span_end(span_id);
56        result
57    }
58
59    /// Run the pipeline-finish lifecycle: `PreFinish`, optional
60    /// `OnUnsettledDetected`, the `on_finish` callback, `PostFinish`. The
61    /// callback (if registered) may transform the return value; everything
62    /// else is advisory.
63    ///
64    /// Tracked: <https://github.com/burin-labs/harn/issues/1854>.
65    async fn run_pipeline_finish_lifecycle(&mut self, value: VmValue) -> Result<VmValue, VmError> {
66        use crate::orchestration::{
67            take_pipeline_on_finish, unsettled_state_snapshot_async, HookEvent,
68        };
69        let _tape_phase =
70            crate::testbench::tape::enter_phase(crate::testbench::tape::TapePhase::RuntimeFinalize);
71
72        let on_finish = take_pipeline_on_finish();
73        let unsettled = unsettled_state_snapshot_async().await;
74
75        let pre_payload = serde_json::json!({
76            "event": HookEvent::PreFinish.as_str(),
77            "return_value": crate::llm::vm_value_to_json(&value),
78            "unsettled": unsettled.to_json(),
79            "has_on_finish": on_finish.is_some(),
80        });
81        self.fire_finish_lifecycle_event(HookEvent::PreFinish, &pre_payload)
82            .await?;
83
84        if !unsettled.is_empty() {
85            let payload = serde_json::json!({
86                "event": HookEvent::OnUnsettledDetected.as_str(),
87                "unsettled": unsettled.to_json(),
88            });
89            self.fire_finish_lifecycle_event(HookEvent::OnUnsettledDetected, &payload)
90                .await?;
91        }
92
93        let final_value = if let Some(closure) = on_finish {
94            let harness_value = crate::harness::Harness::real().into_vm_value();
95            self.call_closure_pub(&closure, &[harness_value, value])
96                .await?
97        } else {
98            value
99        };
100
101        let post_payload = serde_json::json!({
102            "event": HookEvent::PostFinish.as_str(),
103            "return_value": crate::llm::vm_value_to_json(&final_value),
104            "unsettled": unsettled.to_json(),
105        });
106        self.fire_finish_lifecycle_event(HookEvent::PostFinish, &post_payload)
107            .await?;
108
109        Ok(final_value)
110    }
111
112    /// Dispatch a pipeline-finish lifecycle event by invoking matching
113    /// hook closures directly on `self`. The shared `run_lifecycle_hooks`
114    /// path clones a fresh child VM per call and discards its stdout —
115    /// fine for the agent-loop boundaries where hooks are advisory side-
116    /// channels, but the pipeline-finish boundary is the script's last
117    /// chance to print before `vm.output()` is captured, so the closures
118    /// run on `self` to keep their output visible.
119    ///
120    /// Honors the lifecycle control contract (harn#1859):
121    ///   * `PreFinish` rejects `Block` outright — surfaces a runtime
122    ///     error pointing the user at `OnFinish.block_until_settled`.
123    ///     `PostFinish` ignores any control return (advisory only).
124    ///   * `OnUnsettledDetected` honors `Block` to abort the finish
125    ///     lifecycle until the unsettled work clears.
126    ///   * Modify returns are recorded but not consumed at this boundary
127    ///     (the dispatcher already replays subsequent hooks with the
128    ///     post-modify payload via `run_lifecycle_hooks_with_control`).
129    async fn fire_finish_lifecycle_event(
130        &mut self,
131        event: crate::orchestration::HookEvent,
132        payload: &serde_json::Value,
133    ) -> Result<(), VmError> {
134        use crate::orchestration::{HookControl, HookEvent};
135        let invocations = crate::orchestration::matching_vm_lifecycle_hooks(event, payload);
136        if invocations.is_empty() {
137            return Ok(());
138        }
139        let mut current_payload = payload.clone();
140        for invocation in invocations {
141            let arg = crate::stdlib::json_to_vm_value(&current_payload);
142            let closure = invocation.resolve(self).await?;
143            let raw = self.call_closure_pub(&closure, &[arg]).await?;
144            let (action, effects) = crate::orchestration::collect_hook_effects_and_action(
145                event,
146                raw,
147                crate::value::VmValue::Nil,
148            )?;
149            crate::orchestration::inject_hook_effects_into_current_session(effects)?;
150            let control = crate::orchestration::parse_hook_control_for_finish(event, &action)?;
151            match control {
152                HookControl::Allow => {}
153                HookControl::Block { reason } => {
154                    if matches!(event, HookEvent::PreFinish) {
155                        return Err(VmError::Runtime(format!(
156                            "PreFinish hook returned block, which is not a valid control: {reason}. \
157                             To delay pipeline finish until unsettled work clears, use \
158                             OnFinish.block_until_settled (std/lifecycle) or return Modify/Allow \
159                             from PreFinish."
160                        )));
161                    }
162                    if matches!(event, HookEvent::PostFinish) {
163                        // Advisory only; ignore block returns from PostFinish.
164                        continue;
165                    }
166                    // OnUnsettledDetected: block aborts the finish lifecycle.
167                    return Err(VmError::Runtime(format!(
168                        "{} hook blocked pipeline finish: {reason}",
169                        event.as_str()
170                    )));
171                }
172                HookControl::Modify { payload: modified } => {
173                    current_payload = modified;
174                }
175                HookControl::Decision { .. } => {}
176            }
177        }
178        Ok(())
179    }
180
181    /// Convert a VmError into either a handled exception (returning Ok) or a propagated error.
182    pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
183        let thrown_value = match &error {
184            VmError::Thrown(v) => v.clone(),
185            other => VmValue::String(std::sync::Arc::from(other.to_string())),
186        };
187
188        if let Some(handler) = self.exception_handlers.pop() {
189            if let Some(error_type) = handler.error_type.as_deref() {
190                // Typed catch: only match when the thrown enum's type equals the declared type.
191                let matches = match &thrown_value {
192                    VmValue::EnumVariant(enum_variant) => enum_variant.has_enum_name(error_type),
193                    _ => false,
194                };
195                if !matches {
196                    return self.handle_error(error);
197                }
198            }
199
200            self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
201
202            while self.frames.len() > handler.frame_depth {
203                if let Some(frame) = self.frames.pop() {
204                    if let Some(ref dir) = frame.saved_source_dir {
205                        crate::stdlib::set_thread_source_dir(dir);
206                    }
207                    self.iterators.truncate(frame.saved_iterator_depth);
208                    self.env = frame.saved_env;
209                }
210            }
211            crate::step_runtime::prune_below_frame(self.frames.len());
212
213            // Drop deadlines that belonged to unwound frames.
214            while self
215                .deadlines
216                .last()
217                .is_some_and(|d| d.1 > handler.frame_depth)
218            {
219                self.deadlines.pop();
220            }
221
222            self.env.truncate_scopes(handler.env_scope_depth);
223
224            self.stack.truncate(handler.stack_depth);
225            self.stack.push(thrown_value);
226
227            if let Some(frame) = self.frames.last_mut() {
228                frame.ip = handler.catch_ip;
229            }
230
231            Ok(None)
232        } else {
233            Err(error)
234        }
235    }
236
237    pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
238        self.run_chunk_entry(chunk, 0, None, None, None, None).await
239    }
240
241    pub(crate) async fn run_chunk_entry(
242        &mut self,
243        chunk: &Chunk,
244        argc: usize,
245        saved_source_dir: Option<std::path::PathBuf>,
246        module_functions: Option<ModuleFunctionRegistry>,
247        module_state: Option<crate::value::ModuleState>,
248        local_slots: Option<Vec<LocalSlot>>,
249    ) -> Result<VmValue, VmError> {
250        self.run_chunk_ref(
251            Arc::new(chunk.clone()),
252            argc,
253            saved_source_dir,
254            module_functions,
255            module_state,
256            local_slots,
257        )
258        .await
259    }
260
261    pub(crate) async fn run_chunk_ref(
262        &mut self,
263        chunk: ChunkRef,
264        argc: usize,
265        saved_source_dir: Option<std::path::PathBuf>,
266        module_functions: Option<ModuleFunctionRegistry>,
267        module_state: Option<crate::value::ModuleState>,
268        local_slots: Option<Vec<LocalSlot>>,
269    ) -> Result<VmValue, VmError> {
270        let debugger = self.debugger_attached();
271        let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
272        let initial_env = if debugger {
273            Some(self.env.clone())
274        } else {
275            None
276        };
277        let initial_local_slots = if debugger {
278            Some(local_slots.clone())
279        } else {
280            None
281        };
282        self.frames.push(CallFrame {
283            chunk,
284            ip: 0,
285            stack_base: self.stack.len(),
286            saved_env: self.env.clone(),
287            initial_env,
288            initial_local_slots,
289            saved_iterator_depth: self.iterators.len(),
290            fn_name: String::new(),
291            argc,
292            saved_source_dir,
293            module_functions,
294            module_state,
295            local_slots,
296            local_scope_base: self.env.scope_depth().saturating_sub(1),
297            local_scope_depth: 0,
298        });
299
300        self.drive_dispatch_loop(0, false).await
301    }
302
303    /// Sub-execution entrypoint used by [`Vm::call_closure`]: runs the
304    /// dispatch loop until the topmost frame pops back to `target_depth`,
305    /// restoring env/iterators/stack on that final pop so the caller's
306    /// state is intact. Distinct from the entrypoint-mode call in
307    /// [`Vm::run_chunk_ref`] (which preserves the script's top-level scope
308    /// for the module-init capture in `modules.rs`).
309    pub(crate) async fn drive_until_frame_depth(
310        &mut self,
311        target_depth: usize,
312    ) -> Result<VmValue, VmError> {
313        self.drive_dispatch_loop(target_depth, true).await
314    }
315
316    /// Dispatch loop body, parameterized on a target frame depth at which
317    /// the loop should return and whether to restore the caller's
318    /// env/iterators/stack on the final pop.
319    ///
320    /// `restore_on_final_pop = false` is the entrypoint mode used by
321    /// `run_chunk_ref` (leaves the script's top-level state in place so the
322    /// caller can capture it — see `modules.rs`).
323    ///
324    /// `restore_on_final_pop = true` is the sub-execution mode used by
325    /// `call_closure`: the closure's frame is pushed onto the caller's
326    /// frame stack and the loop drains it back to `target_depth`, so the
327    /// per-invocation `Box::pin` heap allocation a recursive async
328    /// `call_closure` would require is avoided.
329    async fn drive_dispatch_loop(
330        &mut self,
331        target_depth: usize,
332        restore_on_final_pop: bool,
333    ) -> Result<VmValue, VmError> {
334        let _task_activity = self
335            .wait_for_graph
336            .register_task(self.runtime_context.task_id.clone());
337        loop {
338            // Slow path only: the interrupt-handler future, deadline check,
339            // and host-signal poll inside `pending_scope_interrupt` are all
340            // no-ops when no cancel/interrupt/deadline machinery is armed
341            // (the common case for unsupervised execution), so guard them
342            // with a sync check that avoids the per-iteration future
343            // state-machine allocation.
344            if !self.scope_interrupts_clean() {
345                if let Some(err) = self.pending_scope_interrupt().await {
346                    match self.handle_error(err) {
347                        Ok(None) => continue,
348                        Ok(Some(val)) => return Ok(val),
349                        Err(e) => {
350                            self.unwind_frames_to_depth(target_depth);
351                            return Err(e);
352                        }
353                    }
354                }
355            }
356
357            let frame = match self.frames.last_mut() {
358                Some(f) => f,
359                None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
360            };
361
362            if frame.ip >= frame.chunk.code.len() {
363                let val = self.stack.pop().unwrap_or(VmValue::Nil);
364                let val = self.run_step_post_hooks_for_current_frame(val).await?;
365                self.release_sync_guards_for_frame(self.frames.len());
366                let popped_frame = self.frames.pop().unwrap();
367                if let Some(ref dir) = popped_frame.saved_source_dir {
368                    crate::stdlib::set_thread_source_dir(dir);
369                }
370                let current_depth = self.frames.len();
371                crate::step_runtime::prune_below_frame(current_depth);
372                // Drop any deadlines owned by the popped frame so the
373                // caller doesn't inherit them (an early `return` from
374                // inside `deadline(d) { ... }` would otherwise leave the
375                // deadline live across the function boundary).
376                while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
377                    self.deadlines.pop();
378                }
379
380                let reached_target = current_depth <= target_depth;
381                if reached_target && !restore_on_final_pop {
382                    // Entrypoint mode: leave env / iterators / stack in place
383                    // so the caller can observe the script's top-level scope.
384                    return Ok(val);
385                }
386                self.iterators.truncate(popped_frame.saved_iterator_depth);
387                self.env = popped_frame.saved_env;
388                self.stack.truncate(popped_frame.stack_base);
389                if reached_target {
390                    return Ok(val);
391                }
392                self.stack.push(val);
393                continue;
394            }
395
396            let op_byte = frame.chunk.code[frame.ip];
397            frame.ip += 1;
398
399            // Sync/async split dispatch: skip the `execute_op` async fn
400            // (a per-iteration `Future` state machine over ~80 match arms)
401            // and the `tokio::select!` deadline/cancel wrapper whenever
402            // the VM has no interrupt machinery armed. Only call/iter/
403            // pipe/parallel/import/yield opcodes still need an `.await`.
404            let op_result: Result<(), VmError> = if self.scope_interrupts_clean() {
405                let op = match Op::from_byte(op_byte) {
406                    Some(op) => op,
407                    None => return Err(VmError::InvalidInstruction(op_byte)),
408                };
409                if let Some(result) = self.execute_op_sync(op) {
410                    result
411                } else {
412                    self.execute_op_async(op).await
413                }
414            } else {
415                match self.execute_op_with_scope_interrupts(op_byte).await {
416                    Ok(Some(val)) => return Ok(val),
417                    Ok(None) => Ok(()),
418                    Err(e) => Err(e),
419                }
420            };
421
422            match op_result {
423                Ok(()) => continue,
424                Err(VmError::Return(val)) => {
425                    let val = self.run_step_post_hooks_for_current_frame(val).await?;
426                    if let Some(popped_frame) = self.frames.pop() {
427                        self.release_sync_guards_for_frame(self.frames.len() + 1);
428                        if let Some(ref dir) = popped_frame.saved_source_dir {
429                            crate::stdlib::set_thread_source_dir(dir);
430                        }
431                        let current_depth = self.frames.len();
432                        self.exception_handlers
433                            .retain(|h| h.frame_depth <= current_depth);
434                        crate::step_runtime::prune_below_frame(current_depth);
435                        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
436                            self.deadlines.pop();
437                        }
438
439                        let reached_target = current_depth <= target_depth;
440                        if reached_target && !restore_on_final_pop {
441                            return Ok(val);
442                        }
443                        self.iterators.truncate(popped_frame.saved_iterator_depth);
444                        self.env = popped_frame.saved_env;
445                        self.stack.truncate(popped_frame.stack_base);
446                        if reached_target {
447                            return Ok(val);
448                        }
449                        self.stack.push(val);
450                    } else {
451                        return Ok(val);
452                    }
453                }
454                Err(e) => {
455                    // Capture stack trace before error handling unwinds frames.
456                    if self.error_stack_trace.is_empty() {
457                        self.error_stack_trace = self.capture_stack_trace();
458                    }
459                    // Honor `@step(error_boundary: ...)` if a step-budget
460                    // exhaustion error is propagating out of the step's
461                    // own frame. `continue` swaps the throw for a Nil
462                    // return; `escalate` re-tags the error as a handoff
463                    // escalation and lets the existing exception
464                    // handlers route it.
465                    let e = match self.apply_step_error_boundary(e) {
466                        StepBoundaryOutcome::Returned(val) => {
467                            self.error_stack_trace.clear();
468                            if self.frames.len() <= target_depth {
469                                return Ok(val);
470                            }
471                            self.stack.push(val);
472                            continue;
473                        }
474                        StepBoundaryOutcome::Throw(err) => err,
475                    };
476                    match self.handle_error(e) {
477                        Ok(None) => {
478                            self.error_stack_trace.clear();
479                            continue;
480                        }
481                        Ok(Some(val)) => return Ok(val),
482                        Err(e) => {
483                            self.unwind_frames_to_depth(target_depth);
484                            return Err(self.enrich_error_with_line(e));
485                        }
486                    }
487                }
488            }
489        }
490    }
491
492    /// Pop frames until `self.frames.len() <= target_depth`, restoring env,
493    /// iterators, stack, source-dir thread-locals, and releasing per-frame
494    /// sync guards for each popped frame. Used by [`drive_until_frame_depth`]
495    /// on the error path so a closure sub-execution leaves caller-visible
496    /// state at the same depth it found when an unhandled error propagates
497    /// out.
498    fn unwind_frames_to_depth(&mut self, target_depth: usize) {
499        while self.frames.len() > target_depth {
500            let frame_depth = self.frames.len();
501            if let Some(frame) = self.frames.pop() {
502                self.release_sync_guards_for_frame(frame_depth);
503                if let Some(ref dir) = frame.saved_source_dir {
504                    crate::stdlib::set_thread_source_dir(dir);
505                }
506                self.iterators.truncate(frame.saved_iterator_depth);
507                self.env = frame.saved_env;
508                self.stack.truncate(frame.stack_base);
509            }
510        }
511        let current_depth = self.frames.len();
512        crate::step_runtime::prune_below_frame(current_depth);
513        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
514            self.deadlines.pop();
515        }
516    }
517
518    /// Inspect a thrown error against the topmost active step's
519    /// `error_boundary`. Called from the main step loop before
520    /// `handle_error` so that a step's own budget-exhaustion error can be
521    /// short-circuited (`continue`) or annotated (`escalate`) before the
522    /// generic try/catch machinery sees it.
523    pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
524        use crate::step_runtime;
525        if !step_runtime::is_step_budget_exhausted(&error) {
526            return StepBoundaryOutcome::Throw(error);
527        }
528        let Some(step_depth) = step_runtime::active_step_frame_depth() else {
529            return StepBoundaryOutcome::Throw(error);
530        };
531        // The step's frame is the topmost on the call stack iff its
532        // recorded frame_depth equals `frames.len()`. If the throw is
533        // coming from a deeper frame we let it bubble up — the boundary
534        // still applies later when the step's own frame is reached.
535        if step_depth != self.frames.len() {
536            return StepBoundaryOutcome::Throw(error);
537        }
538        let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
539            .unwrap_or(step_runtime::StepErrorBoundary::Fail);
540        match boundary {
541            step_runtime::StepErrorBoundary::Continue => {
542                // Mimic VmError::Return(Nil) for the step's frame: pop
543                // the frame, restore its env/iterators/stack, and feed a
544                // Nil return value back to the caller.
545                if let Some(popped) = self.frames.pop() {
546                    self.release_sync_guards_for_frame(self.frames.len() + 1);
547                    if let Some(ref dir) = popped.saved_source_dir {
548                        crate::stdlib::set_thread_source_dir(dir);
549                    }
550                    let current_depth = self.frames.len();
551                    self.exception_handlers
552                        .retain(|h| h.frame_depth <= current_depth);
553                    step_runtime::pop_and_record(
554                        current_depth + 1,
555                        "skipped",
556                        Some(step_runtime_error_message(&error)),
557                    );
558                    if self.frames.is_empty() {
559                        return StepBoundaryOutcome::Returned(VmValue::Nil);
560                    }
561                    self.iterators.truncate(popped.saved_iterator_depth);
562                    self.env = popped.saved_env;
563                    self.stack.truncate(popped.stack_base);
564                }
565                StepBoundaryOutcome::Returned(VmValue::Nil)
566            }
567            step_runtime::StepErrorBoundary::Escalate => {
568                let identity = step_runtime::with_active_step(|step| {
569                    (
570                        step.definition.name.clone(),
571                        step.definition.function.clone(),
572                    )
573                });
574                step_runtime::pop_and_record(
575                    step_depth,
576                    "escalated",
577                    Some(step_runtime_error_message(&error)),
578                );
579                let (step_name, function) = identity.unzip();
580                StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
581                    error,
582                    step_name.as_deref(),
583                    function.as_deref(),
584                ))
585            }
586            step_runtime::StepErrorBoundary::Fail => {
587                step_runtime::pop_and_record(
588                    step_depth,
589                    "failed",
590                    Some(step_runtime_error_message(&error)),
591                );
592                StepBoundaryOutcome::Throw(error)
593            }
594        }
595    }
596}
597
598fn next_deadline(
599    scope_deadline: Option<Instant>,
600    interrupt_handler_deadline: Option<Instant>,
601) -> (Option<Instant>, Option<DeadlineKind>) {
602    match (scope_deadline, interrupt_handler_deadline) {
603        (Some(scope), Some(interrupt)) if interrupt < scope => {
604            (Some(interrupt), Some(DeadlineKind::InterruptHandler))
605        }
606        (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
607        (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
608        (None, None) => (None, None),
609    }
610}
611
612fn step_runtime_error_message(error: &VmError) -> String {
613    match error {
614        VmError::Thrown(VmValue::Dict(dict)) => dict
615            .get("message")
616            .map(|v| v.display())
617            .unwrap_or_else(|| error.to_string()),
618        _ => error.to_string(),
619    }
620}
621
622pub(crate) enum StepBoundaryOutcome {
623    Returned(VmValue),
624    Throw(VmError),
625}
626
627impl crate::vm::Vm {
628    pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
629        if let Some(err) = self.pending_scope_interrupt().await {
630            match self.handle_error(err) {
631                Ok(None) => return Ok(None),
632                Ok(Some(val)) => return Ok(Some((val, false))),
633                Err(e) => return Err(e),
634            }
635        }
636
637        let frame = match self.frames.last_mut() {
638            Some(f) => f,
639            None => {
640                let val = self.stack.pop().unwrap_or(VmValue::Nil);
641                return Ok(Some((val, false)));
642            }
643        };
644
645        if frame.ip >= frame.chunk.code.len() {
646            let val = self.stack.pop().unwrap_or(VmValue::Nil);
647            self.release_sync_guards_for_frame(self.frames.len());
648            let popped_frame = self.frames.pop().unwrap();
649            if self.frames.is_empty() {
650                return Ok(Some((val, false)));
651            }
652            self.iterators.truncate(popped_frame.saved_iterator_depth);
653            self.env = popped_frame.saved_env;
654            self.stack.truncate(popped_frame.stack_base);
655            self.stack.push(val);
656            return Ok(None);
657        }
658
659        let op = frame.chunk.code[frame.ip];
660        frame.ip += 1;
661
662        match self.execute_op_with_scope_interrupts(op).await {
663            Ok(Some(val)) => Ok(Some((val, false))),
664            Ok(None) => Ok(None),
665            Err(VmError::Return(val)) => {
666                if let Some(popped_frame) = self.frames.pop() {
667                    self.release_sync_guards_for_frame(self.frames.len() + 1);
668                    if let Some(ref dir) = popped_frame.saved_source_dir {
669                        crate::stdlib::set_thread_source_dir(dir);
670                    }
671                    let current_depth = self.frames.len();
672                    self.exception_handlers
673                        .retain(|h| h.frame_depth <= current_depth);
674                    if self.frames.is_empty() {
675                        return Ok(Some((val, false)));
676                    }
677                    self.iterators.truncate(popped_frame.saved_iterator_depth);
678                    self.env = popped_frame.saved_env;
679                    self.stack.truncate(popped_frame.stack_base);
680                    self.stack.push(val);
681                    Ok(None)
682                } else {
683                    Ok(Some((val, false)))
684                }
685            }
686            Err(e) => {
687                if self.error_stack_trace.is_empty() {
688                    self.error_stack_trace = self.capture_stack_trace();
689                }
690                match self.handle_error(e) {
691                    Ok(None) => {
692                        self.error_stack_trace.clear();
693                        Ok(None)
694                    }
695                    Ok(Some(val)) => Ok(Some((val, false))),
696                    Err(e) => Err(self.enrich_error_with_line(e)),
697                }
698            }
699        }
700    }
701
702    async fn execute_op_with_scope_interrupts(
703        &mut self,
704        op: u8,
705    ) -> Result<Option<VmValue>, VmError> {
706        enum ScopeInterruptResult {
707            Op(Result<Option<VmValue>, VmError>),
708            Deadline(DeadlineKind),
709            CancelTimedOut,
710        }
711
712        let (deadline, deadline_kind) = next_deadline(
713            self.deadlines.last().map(|(deadline, _)| *deadline),
714            self.interrupt_handler_deadline,
715        );
716        let cancel_token = self.cancel_token.clone();
717
718        if deadline.is_none() && cancel_token.is_none() {
719            return self.execute_op(op).await;
720        }
721
722        let has_deadline = deadline.is_some();
723        let cancel_requested_at_start = cancel_token
724            .as_ref()
725            .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
726        let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
727        let deadline_sleep = async move {
728            if let Some(deadline) = deadline {
729                tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
730            } else {
731                std::future::pending::<()>().await;
732            }
733        };
734        let cancel_sleep = async move {
735            if let Some(token) = cancel_token {
736                while !token.load(std::sync::atomic::Ordering::SeqCst) {
737                    tokio::time::sleep(Duration::from_millis(10)).await;
738                }
739            } else {
740                std::future::pending::<()>().await;
741            }
742        };
743
744        let result = {
745            let op_future = self.execute_op(op);
746            tokio::pin!(op_future);
747            tokio::select! {
748                result = &mut op_future => ScopeInterruptResult::Op(result),
749                _ = deadline_sleep, if has_deadline => {
750                    ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
751                },
752                _ = cancel_sleep, if has_cancel => {
753                    let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
754                    tokio::pin!(grace);
755                    tokio::select! {
756                        result = &mut op_future => ScopeInterruptResult::Op(result),
757                        _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
758                    }
759                }
760            }
761        };
762
763        match result {
764            ScopeInterruptResult::Op(result) => result,
765            ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
766                self.deadlines.pop();
767                self.cancel_spawned_tasks();
768                Err(Self::deadline_exceeded_error())
769            }
770            ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
771                Err(Self::interrupt_handler_timeout_error())
772            }
773            ScopeInterruptResult::CancelTimedOut => {
774                self.cancel_spawned_tasks();
775                let signal = self
776                    .take_host_interrupt_signal()
777                    .unwrap_or_else(|| "SIGINT".to_string());
778                if self.has_interrupt_handler_for(&signal) {
779                    self.dispatch_interrupt_handlers(&signal).await?;
780                }
781                Err(Self::cancelled_error())
782            }
783        }
784    }
785
786    pub(crate) fn deadline_exceeded_error() -> VmError {
787        VmError::Thrown(VmValue::String(std::sync::Arc::from("Deadline exceeded")))
788    }
789
790    pub(crate) fn cancelled_error() -> VmError {
791        VmError::Thrown(VmValue::String(std::sync::Arc::from(
792            "kind:cancelled:VM cancelled by host",
793        )))
794    }
795
796    /// Capture the current call stack as (fn_name, line, col, source_file) tuples.
797    pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
798        self.frames
799            .iter()
800            .map(|f| {
801                let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
802                let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
803                let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
804                (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
805            })
806            .collect()
807    }
808
809    /// Enrich a VmError with source line information from the captured stack
810    /// trace. Appends ` (line N)` to error variants whose messages don't
811    /// already carry location context.
812    pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
813        // Determine the line from the captured stack trace (innermost frame).
814        let line = self
815            .error_stack_trace
816            .last()
817            .map(|(_, l, _, _)| *l)
818            .unwrap_or_else(|| self.current_line());
819        if line == 0 {
820            return error;
821        }
822        let suffix = format!(" (line {line})");
823        match error {
824            VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
825            VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
826            VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
827            VmError::UndefinedVariable(name) => {
828                VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
829            }
830            VmError::UndefinedBuiltin(name) => {
831                VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
832            }
833            VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
834                "Cannot assign to immutable binding: {name}{suffix}"
835            )),
836            VmError::StackOverflow => {
837                VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
838            }
839            // Leave these untouched:
840            // - Thrown: user-thrown errors should not be silently modified
841            // - CategorizedError: structured errors for agent orchestration
842            // - Return: control flow, not a real error
843            // - StackUnderflow / InvalidInstruction: internal VM bugs
844            other => other,
845        }
846    }
847}