Skip to main content

harn_vm/vm/
execution.rs

1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef, Op};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13    Scope,
14    InterruptHandler,
15}
16
17impl Vm {
18    /// Returns true when no scope-level async machinery is armed. The hot
19    /// interpreter loop uses this to skip both the `pending_scope_interrupt`
20    /// future and the `execute_op_with_scope_interrupts` `tokio::select!`
21    /// wrapper on every dispatch — both are necessary for cancellable /
22    /// deadlined VMs but pure overhead in the common case (benchmarks,
23    /// background script execution, etc.).
24    #[inline]
25    pub(crate) fn scope_interrupts_clean(&self) -> bool {
26        self.cancel_token.is_none()
27            && self.interrupt_signal_token.is_none()
28            && self.pending_interrupt_signal.is_none()
29            && self.interrupt_handler_deadline.is_none()
30            && self.deadlines.is_empty()
31    }
32
33    /// Execute a compiled chunk.
34    pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
35        let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
36        let result = self.run_chunk(chunk).await;
37        let result = match result {
38            Ok(value) => self.run_pipeline_finish_lifecycle(value).await,
39            Err(error) => {
40                crate::orchestration::clear_pipeline_on_finish();
41                Err(error)
42            }
43        };
44        crate::tracing::span_end(span_id);
45        result
46    }
47
48    /// Run the pipeline-finish lifecycle: `PreFinish`, optional
49    /// `OnUnsettledDetected`, the `on_finish` callback, `PostFinish`. The
50    /// callback (if registered) may transform the return value; everything
51    /// else is advisory.
52    ///
53    /// Tracked: <https://github.com/burin-labs/harn/issues/1854>.
54    async fn run_pipeline_finish_lifecycle(&mut self, value: VmValue) -> Result<VmValue, VmError> {
55        use crate::orchestration::{
56            take_pipeline_on_finish, unsettled_state_snapshot_async, HookEvent,
57        };
58        let _tape_phase =
59            crate::testbench::tape::enter_phase(crate::testbench::tape::TapePhase::RuntimeFinalize);
60
61        let on_finish = take_pipeline_on_finish();
62        let unsettled = unsettled_state_snapshot_async().await;
63
64        let pre_payload = serde_json::json!({
65            "event": HookEvent::PreFinish.as_str(),
66            "return_value": crate::llm::vm_value_to_json(&value),
67            "unsettled": unsettled.to_json(),
68            "has_on_finish": on_finish.is_some(),
69        });
70        self.fire_finish_lifecycle_event(HookEvent::PreFinish, &pre_payload)
71            .await?;
72
73        if !unsettled.is_empty() {
74            let payload = serde_json::json!({
75                "event": HookEvent::OnUnsettledDetected.as_str(),
76                "unsettled": unsettled.to_json(),
77            });
78            self.fire_finish_lifecycle_event(HookEvent::OnUnsettledDetected, &payload)
79                .await?;
80        }
81
82        let final_value = if let Some(closure) = on_finish {
83            let harness_value = crate::harness::Harness::real().into_vm_value();
84            self.call_closure_pub(&closure, &[harness_value, value])
85                .await?
86        } else {
87            value
88        };
89
90        let post_payload = serde_json::json!({
91            "event": HookEvent::PostFinish.as_str(),
92            "return_value": crate::llm::vm_value_to_json(&final_value),
93            "unsettled": unsettled.to_json(),
94        });
95        self.fire_finish_lifecycle_event(HookEvent::PostFinish, &post_payload)
96            .await?;
97
98        Ok(final_value)
99    }
100
101    /// Dispatch a pipeline-finish lifecycle event by invoking matching
102    /// hook closures directly on `self`. The shared `run_lifecycle_hooks`
103    /// path clones a fresh child VM per call and discards its stdout —
104    /// fine for the agent-loop boundaries where hooks are advisory side-
105    /// channels, but the pipeline-finish boundary is the script's last
106    /// chance to print before `vm.output()` is captured, so the closures
107    /// run on `self` to keep their output visible.
108    ///
109    /// Honors the lifecycle control contract (harn#1859):
110    ///   * `PreFinish` rejects `Block` outright — surfaces a runtime
111    ///     error pointing the user at `OnFinish.block_until_settled`.
112    ///     `PostFinish` ignores any control return (advisory only).
113    ///   * `OnUnsettledDetected` honors `Block` to abort the finish
114    ///     lifecycle until the unsettled work clears.
115    ///   * Modify returns are recorded but not consumed at this boundary
116    ///     (the dispatcher already replays subsequent hooks with the
117    ///     post-modify payload via `run_lifecycle_hooks_with_control`).
118    async fn fire_finish_lifecycle_event(
119        &mut self,
120        event: crate::orchestration::HookEvent,
121        payload: &serde_json::Value,
122    ) -> Result<(), VmError> {
123        use crate::orchestration::{HookControl, HookEvent};
124        let invocations = crate::orchestration::matching_vm_lifecycle_hooks(event, payload);
125        if invocations.is_empty() {
126            return Ok(());
127        }
128        let mut current_payload = payload.clone();
129        for invocation in invocations {
130            let arg = crate::stdlib::json_to_vm_value(&current_payload);
131            let raw = self.call_closure_pub(&invocation.closure, &[arg]).await?;
132            let (action, effects) = crate::orchestration::collect_hook_effects_and_action(
133                event,
134                raw,
135                crate::value::VmValue::Nil,
136            )?;
137            crate::orchestration::inject_hook_effects_into_current_session(effects)?;
138            let control = crate::orchestration::parse_hook_control_for_finish(event, &action)?;
139            match control {
140                HookControl::Allow => {}
141                HookControl::Block { reason } => {
142                    if matches!(event, HookEvent::PreFinish) {
143                        return Err(VmError::Runtime(format!(
144                            "PreFinish hook returned block, which is not a valid control: {reason}. \
145                             To delay pipeline finish until unsettled work clears, use \
146                             OnFinish.block_until_settled (std/lifecycle) or return Modify/Allow \
147                             from PreFinish."
148                        )));
149                    }
150                    if matches!(event, HookEvent::PostFinish) {
151                        // Advisory only; ignore block returns from PostFinish.
152                        continue;
153                    }
154                    // OnUnsettledDetected: block aborts the finish lifecycle.
155                    return Err(VmError::Runtime(format!(
156                        "{} hook blocked pipeline finish: {reason}",
157                        event.as_str()
158                    )));
159                }
160                HookControl::Modify { payload: modified } => {
161                    current_payload = modified;
162                }
163                HookControl::Decision { .. } => {}
164            }
165        }
166        Ok(())
167    }
168
169    /// Convert a VmError into either a handled exception (returning Ok) or a propagated error.
170    pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
171        let thrown_value = match &error {
172            VmError::Thrown(v) => v.clone(),
173            other => VmValue::String(Rc::from(other.to_string())),
174        };
175
176        if let Some(handler) = self.exception_handlers.pop() {
177            if !handler.error_type.is_empty() {
178                // Typed catch: only match when the thrown enum's type equals the declared type.
179                let matches = match &thrown_value {
180                    VmValue::EnumVariant(enum_variant) => {
181                        enum_variant.has_enum_name(&handler.error_type)
182                    }
183                    _ => false,
184                };
185                if !matches {
186                    return self.handle_error(error);
187                }
188            }
189
190            self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
191
192            while self.frames.len() > handler.frame_depth {
193                if let Some(frame) = self.frames.pop() {
194                    if let Some(ref dir) = frame.saved_source_dir {
195                        crate::stdlib::set_thread_source_dir(dir);
196                    }
197                    self.iterators.truncate(frame.saved_iterator_depth);
198                    self.env = frame.saved_env;
199                }
200            }
201            crate::step_runtime::prune_below_frame(self.frames.len());
202
203            // Drop deadlines that belonged to unwound frames.
204            while self
205                .deadlines
206                .last()
207                .is_some_and(|d| d.1 > handler.frame_depth)
208            {
209                self.deadlines.pop();
210            }
211
212            self.env.truncate_scopes(handler.env_scope_depth);
213
214            self.stack.truncate(handler.stack_depth);
215            self.stack.push(thrown_value);
216
217            if let Some(frame) = self.frames.last_mut() {
218                frame.ip = handler.catch_ip;
219            }
220
221            Ok(None)
222        } else {
223            Err(error)
224        }
225    }
226
227    pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
228        self.run_chunk_entry(chunk, 0, None, None, None, None).await
229    }
230
231    pub(crate) async fn run_chunk_entry(
232        &mut self,
233        chunk: &Chunk,
234        argc: usize,
235        saved_source_dir: Option<std::path::PathBuf>,
236        module_functions: Option<ModuleFunctionRegistry>,
237        module_state: Option<crate::value::ModuleState>,
238        local_slots: Option<Vec<LocalSlot>>,
239    ) -> Result<VmValue, VmError> {
240        self.run_chunk_ref(
241            Rc::new(chunk.clone()),
242            argc,
243            saved_source_dir,
244            module_functions,
245            module_state,
246            local_slots,
247        )
248        .await
249    }
250
251    pub(crate) async fn run_chunk_ref(
252        &mut self,
253        chunk: ChunkRef,
254        argc: usize,
255        saved_source_dir: Option<std::path::PathBuf>,
256        module_functions: Option<ModuleFunctionRegistry>,
257        module_state: Option<crate::value::ModuleState>,
258        local_slots: Option<Vec<LocalSlot>>,
259    ) -> Result<VmValue, VmError> {
260        let debugger = self.debugger_attached();
261        let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
262        let initial_env = if debugger {
263            Some(self.env.clone())
264        } else {
265            None
266        };
267        let initial_local_slots = if debugger {
268            Some(local_slots.clone())
269        } else {
270            None
271        };
272        self.frames.push(CallFrame {
273            chunk,
274            ip: 0,
275            stack_base: self.stack.len(),
276            saved_env: self.env.clone(),
277            initial_env,
278            initial_local_slots,
279            saved_iterator_depth: self.iterators.len(),
280            fn_name: String::new(),
281            argc,
282            saved_source_dir,
283            module_functions,
284            module_state,
285            local_slots,
286            local_scope_base: self.env.scope_depth().saturating_sub(1),
287            local_scope_depth: 0,
288        });
289
290        self.drive_dispatch_loop(0, false).await
291    }
292
293    /// Sub-execution entrypoint used by [`Vm::call_closure`]: runs the
294    /// dispatch loop until the topmost frame pops back to `target_depth`,
295    /// restoring env/iterators/stack on that final pop so the caller's
296    /// state is intact. Distinct from the entrypoint-mode call in
297    /// [`Vm::run_chunk_ref`] (which preserves the script's top-level scope
298    /// for the module-init capture in `modules.rs`).
299    pub(crate) async fn drive_until_frame_depth(
300        &mut self,
301        target_depth: usize,
302    ) -> Result<VmValue, VmError> {
303        self.drive_dispatch_loop(target_depth, true).await
304    }
305
306    /// Dispatch loop body, parameterized on a target frame depth at which
307    /// the loop should return and whether to restore the caller's
308    /// env/iterators/stack on the final pop.
309    ///
310    /// `restore_on_final_pop = false` is the entrypoint mode used by
311    /// `run_chunk_ref` (leaves the script's top-level state in place so the
312    /// caller can capture it — see `modules.rs`).
313    ///
314    /// `restore_on_final_pop = true` is the sub-execution mode used by
315    /// `call_closure`: the closure's frame is pushed onto the caller's
316    /// frame stack and the loop drains it back to `target_depth`, so the
317    /// per-invocation `Box::pin` heap allocation a recursive async
318    /// `call_closure` would require is avoided.
319    async fn drive_dispatch_loop(
320        &mut self,
321        target_depth: usize,
322        restore_on_final_pop: bool,
323    ) -> Result<VmValue, VmError> {
324        loop {
325            // Slow path only: the interrupt-handler future, deadline check,
326            // and host-signal poll inside `pending_scope_interrupt` are all
327            // no-ops when no cancel/interrupt/deadline machinery is armed
328            // (the common case for unsupervised execution), so guard them
329            // with a sync check that avoids the per-iteration future
330            // state-machine allocation.
331            if !self.scope_interrupts_clean() {
332                if let Some(err) = self.pending_scope_interrupt().await {
333                    match self.handle_error(err) {
334                        Ok(None) => continue,
335                        Ok(Some(val)) => return Ok(val),
336                        Err(e) => {
337                            self.unwind_frames_to_depth(target_depth);
338                            return Err(e);
339                        }
340                    }
341                }
342            }
343
344            let frame = match self.frames.last_mut() {
345                Some(f) => f,
346                None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
347            };
348
349            if frame.ip >= frame.chunk.code.len() {
350                let val = self.stack.pop().unwrap_or(VmValue::Nil);
351                let val = self.run_step_post_hooks_for_current_frame(val).await?;
352                self.release_sync_guards_for_frame(self.frames.len());
353                let popped_frame = self.frames.pop().unwrap();
354                if let Some(ref dir) = popped_frame.saved_source_dir {
355                    crate::stdlib::set_thread_source_dir(dir);
356                }
357                let current_depth = self.frames.len();
358                crate::step_runtime::prune_below_frame(current_depth);
359                // Drop any deadlines owned by the popped frame so the
360                // caller doesn't inherit them (an early `return` from
361                // inside `deadline(d) { ... }` would otherwise leave the
362                // deadline live across the function boundary).
363                while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
364                    self.deadlines.pop();
365                }
366
367                let reached_target = current_depth <= target_depth;
368                if reached_target && !restore_on_final_pop {
369                    // Entrypoint mode: leave env / iterators / stack in place
370                    // so the caller can observe the script's top-level scope.
371                    return Ok(val);
372                }
373                self.iterators.truncate(popped_frame.saved_iterator_depth);
374                self.env = popped_frame.saved_env;
375                self.stack.truncate(popped_frame.stack_base);
376                if reached_target {
377                    return Ok(val);
378                }
379                self.stack.push(val);
380                continue;
381            }
382
383            let op_byte = frame.chunk.code[frame.ip];
384            frame.ip += 1;
385
386            // Sync/async split dispatch: skip the `execute_op` async fn
387            // (a per-iteration `Future` state machine over ~80 match arms)
388            // and the `tokio::select!` deadline/cancel wrapper whenever
389            // the VM has no interrupt machinery armed. Only call/iter/
390            // pipe/parallel/import/yield opcodes still need an `.await`.
391            let op_result: Result<(), VmError> = if self.scope_interrupts_clean() {
392                let op = match Op::from_byte(op_byte) {
393                    Some(op) => op,
394                    None => return Err(VmError::InvalidInstruction(op_byte)),
395                };
396                if let Some(result) = self.execute_op_sync(op) {
397                    result
398                } else {
399                    self.execute_op_async(op).await
400                }
401            } else {
402                match self.execute_op_with_scope_interrupts(op_byte).await {
403                    Ok(Some(val)) => return Ok(val),
404                    Ok(None) => Ok(()),
405                    Err(e) => Err(e),
406                }
407            };
408
409            match op_result {
410                Ok(()) => continue,
411                Err(VmError::Return(val)) => {
412                    let val = self.run_step_post_hooks_for_current_frame(val).await?;
413                    if let Some(popped_frame) = self.frames.pop() {
414                        self.release_sync_guards_for_frame(self.frames.len() + 1);
415                        if let Some(ref dir) = popped_frame.saved_source_dir {
416                            crate::stdlib::set_thread_source_dir(dir);
417                        }
418                        let current_depth = self.frames.len();
419                        self.exception_handlers
420                            .retain(|h| h.frame_depth <= current_depth);
421                        crate::step_runtime::prune_below_frame(current_depth);
422                        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
423                            self.deadlines.pop();
424                        }
425
426                        let reached_target = current_depth <= target_depth;
427                        if reached_target && !restore_on_final_pop {
428                            return Ok(val);
429                        }
430                        self.iterators.truncate(popped_frame.saved_iterator_depth);
431                        self.env = popped_frame.saved_env;
432                        self.stack.truncate(popped_frame.stack_base);
433                        if reached_target {
434                            return Ok(val);
435                        }
436                        self.stack.push(val);
437                    } else {
438                        return Ok(val);
439                    }
440                }
441                Err(e) => {
442                    // Capture stack trace before error handling unwinds frames.
443                    if self.error_stack_trace.is_empty() {
444                        self.error_stack_trace = self.capture_stack_trace();
445                    }
446                    // Honor `@step(error_boundary: ...)` if a step-budget
447                    // exhaustion error is propagating out of the step's
448                    // own frame. `continue` swaps the throw for a Nil
449                    // return; `escalate` re-tags the error as a handoff
450                    // escalation and lets the existing exception
451                    // handlers route it.
452                    let e = match self.apply_step_error_boundary(e) {
453                        StepBoundaryOutcome::Returned(val) => {
454                            self.error_stack_trace.clear();
455                            if self.frames.len() <= target_depth {
456                                return Ok(val);
457                            }
458                            self.stack.push(val);
459                            continue;
460                        }
461                        StepBoundaryOutcome::Throw(err) => err,
462                    };
463                    match self.handle_error(e) {
464                        Ok(None) => {
465                            self.error_stack_trace.clear();
466                            continue;
467                        }
468                        Ok(Some(val)) => return Ok(val),
469                        Err(e) => {
470                            self.unwind_frames_to_depth(target_depth);
471                            return Err(self.enrich_error_with_line(e));
472                        }
473                    }
474                }
475            }
476        }
477    }
478
479    /// Pop frames until `self.frames.len() <= target_depth`, restoring env,
480    /// iterators, stack, source-dir thread-locals, and releasing per-frame
481    /// sync guards for each popped frame. Used by [`drive_until_frame_depth`]
482    /// on the error path so a closure sub-execution leaves caller-visible
483    /// state at the same depth it found when an unhandled error propagates
484    /// out.
485    fn unwind_frames_to_depth(&mut self, target_depth: usize) {
486        while self.frames.len() > target_depth {
487            let frame_depth = self.frames.len();
488            if let Some(frame) = self.frames.pop() {
489                self.release_sync_guards_for_frame(frame_depth);
490                if let Some(ref dir) = frame.saved_source_dir {
491                    crate::stdlib::set_thread_source_dir(dir);
492                }
493                self.iterators.truncate(frame.saved_iterator_depth);
494                self.env = frame.saved_env;
495                self.stack.truncate(frame.stack_base);
496            }
497        }
498        let current_depth = self.frames.len();
499        crate::step_runtime::prune_below_frame(current_depth);
500        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
501            self.deadlines.pop();
502        }
503    }
504
505    /// Inspect a thrown error against the topmost active step's
506    /// `error_boundary`. Called from the main step loop before
507    /// `handle_error` so that a step's own budget-exhaustion error can be
508    /// short-circuited (`continue`) or annotated (`escalate`) before the
509    /// generic try/catch machinery sees it.
510    pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
511        use crate::step_runtime;
512        if !step_runtime::is_step_budget_exhausted(&error) {
513            return StepBoundaryOutcome::Throw(error);
514        }
515        let Some(step_depth) = step_runtime::active_step_frame_depth() else {
516            return StepBoundaryOutcome::Throw(error);
517        };
518        // The step's frame is the topmost on the call stack iff its
519        // recorded frame_depth equals `frames.len()`. If the throw is
520        // coming from a deeper frame we let it bubble up — the boundary
521        // still applies later when the step's own frame is reached.
522        if step_depth != self.frames.len() {
523            return StepBoundaryOutcome::Throw(error);
524        }
525        let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
526            .unwrap_or(step_runtime::StepErrorBoundary::Fail);
527        match boundary {
528            step_runtime::StepErrorBoundary::Continue => {
529                // Mimic VmError::Return(Nil) for the step's frame: pop
530                // the frame, restore its env/iterators/stack, and feed a
531                // Nil return value back to the caller.
532                if let Some(popped) = self.frames.pop() {
533                    self.release_sync_guards_for_frame(self.frames.len() + 1);
534                    if let Some(ref dir) = popped.saved_source_dir {
535                        crate::stdlib::set_thread_source_dir(dir);
536                    }
537                    let current_depth = self.frames.len();
538                    self.exception_handlers
539                        .retain(|h| h.frame_depth <= current_depth);
540                    step_runtime::pop_and_record(
541                        current_depth + 1,
542                        "skipped",
543                        Some(step_runtime_error_message(&error)),
544                    );
545                    if self.frames.is_empty() {
546                        return StepBoundaryOutcome::Returned(VmValue::Nil);
547                    }
548                    self.iterators.truncate(popped.saved_iterator_depth);
549                    self.env = popped.saved_env;
550                    self.stack.truncate(popped.stack_base);
551                }
552                StepBoundaryOutcome::Returned(VmValue::Nil)
553            }
554            step_runtime::StepErrorBoundary::Escalate => {
555                let identity = step_runtime::with_active_step(|step| {
556                    (
557                        step.definition.name.clone(),
558                        step.definition.function.clone(),
559                    )
560                });
561                step_runtime::pop_and_record(
562                    step_depth,
563                    "escalated",
564                    Some(step_runtime_error_message(&error)),
565                );
566                let (step_name, function) = identity.unzip();
567                StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
568                    error,
569                    step_name.as_deref(),
570                    function.as_deref(),
571                ))
572            }
573            step_runtime::StepErrorBoundary::Fail => {
574                step_runtime::pop_and_record(
575                    step_depth,
576                    "failed",
577                    Some(step_runtime_error_message(&error)),
578                );
579                StepBoundaryOutcome::Throw(error)
580            }
581        }
582    }
583}
584
585fn next_deadline(
586    scope_deadline: Option<Instant>,
587    interrupt_handler_deadline: Option<Instant>,
588) -> (Option<Instant>, Option<DeadlineKind>) {
589    match (scope_deadline, interrupt_handler_deadline) {
590        (Some(scope), Some(interrupt)) if interrupt < scope => {
591            (Some(interrupt), Some(DeadlineKind::InterruptHandler))
592        }
593        (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
594        (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
595        (None, None) => (None, None),
596    }
597}
598
599fn step_runtime_error_message(error: &VmError) -> String {
600    match error {
601        VmError::Thrown(VmValue::Dict(dict)) => dict
602            .get("message")
603            .map(|v| v.display())
604            .unwrap_or_else(|| error.to_string()),
605        _ => error.to_string(),
606    }
607}
608
609pub(crate) enum StepBoundaryOutcome {
610    Returned(VmValue),
611    Throw(VmError),
612}
613
614impl crate::vm::Vm {
615    pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
616        if let Some(err) = self.pending_scope_interrupt().await {
617            match self.handle_error(err) {
618                Ok(None) => return Ok(None),
619                Ok(Some(val)) => return Ok(Some((val, false))),
620                Err(e) => return Err(e),
621            }
622        }
623
624        let frame = match self.frames.last_mut() {
625            Some(f) => f,
626            None => {
627                let val = self.stack.pop().unwrap_or(VmValue::Nil);
628                return Ok(Some((val, false)));
629            }
630        };
631
632        if frame.ip >= frame.chunk.code.len() {
633            let val = self.stack.pop().unwrap_or(VmValue::Nil);
634            self.release_sync_guards_for_frame(self.frames.len());
635            let popped_frame = self.frames.pop().unwrap();
636            if self.frames.is_empty() {
637                return Ok(Some((val, false)));
638            } else {
639                self.iterators.truncate(popped_frame.saved_iterator_depth);
640                self.env = popped_frame.saved_env;
641                self.stack.truncate(popped_frame.stack_base);
642                self.stack.push(val);
643                return Ok(None);
644            }
645        }
646
647        let op = frame.chunk.code[frame.ip];
648        frame.ip += 1;
649
650        match self.execute_op_with_scope_interrupts(op).await {
651            Ok(Some(val)) => Ok(Some((val, false))),
652            Ok(None) => Ok(None),
653            Err(VmError::Return(val)) => {
654                if let Some(popped_frame) = self.frames.pop() {
655                    self.release_sync_guards_for_frame(self.frames.len() + 1);
656                    if let Some(ref dir) = popped_frame.saved_source_dir {
657                        crate::stdlib::set_thread_source_dir(dir);
658                    }
659                    let current_depth = self.frames.len();
660                    self.exception_handlers
661                        .retain(|h| h.frame_depth <= current_depth);
662                    if self.frames.is_empty() {
663                        return Ok(Some((val, false)));
664                    }
665                    self.iterators.truncate(popped_frame.saved_iterator_depth);
666                    self.env = popped_frame.saved_env;
667                    self.stack.truncate(popped_frame.stack_base);
668                    self.stack.push(val);
669                    Ok(None)
670                } else {
671                    Ok(Some((val, false)))
672                }
673            }
674            Err(e) => {
675                if self.error_stack_trace.is_empty() {
676                    self.error_stack_trace = self.capture_stack_trace();
677                }
678                match self.handle_error(e) {
679                    Ok(None) => {
680                        self.error_stack_trace.clear();
681                        Ok(None)
682                    }
683                    Ok(Some(val)) => Ok(Some((val, false))),
684                    Err(e) => Err(self.enrich_error_with_line(e)),
685                }
686            }
687        }
688    }
689
690    async fn execute_op_with_scope_interrupts(
691        &mut self,
692        op: u8,
693    ) -> Result<Option<VmValue>, VmError> {
694        enum ScopeInterruptResult {
695            Op(Result<Option<VmValue>, VmError>),
696            Deadline(DeadlineKind),
697            CancelTimedOut,
698        }
699
700        let (deadline, deadline_kind) = next_deadline(
701            self.deadlines.last().map(|(deadline, _)| *deadline),
702            self.interrupt_handler_deadline,
703        );
704        let cancel_token = self.cancel_token.clone();
705
706        if deadline.is_none() && cancel_token.is_none() {
707            return self.execute_op(op).await;
708        }
709
710        let has_deadline = deadline.is_some();
711        let cancel_requested_at_start = cancel_token
712            .as_ref()
713            .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
714        let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
715        let deadline_sleep = async move {
716            if let Some(deadline) = deadline {
717                tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
718            } else {
719                std::future::pending::<()>().await;
720            }
721        };
722        let cancel_sleep = async move {
723            if let Some(token) = cancel_token {
724                while !token.load(std::sync::atomic::Ordering::SeqCst) {
725                    tokio::time::sleep(Duration::from_millis(10)).await;
726                }
727            } else {
728                std::future::pending::<()>().await;
729            }
730        };
731
732        let result = {
733            let op_future = self.execute_op(op);
734            tokio::pin!(op_future);
735            tokio::select! {
736                result = &mut op_future => ScopeInterruptResult::Op(result),
737                _ = deadline_sleep, if has_deadline => {
738                    ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
739                },
740                _ = cancel_sleep, if has_cancel => {
741                    let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
742                    tokio::pin!(grace);
743                    tokio::select! {
744                        result = &mut op_future => ScopeInterruptResult::Op(result),
745                        _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
746                    }
747                }
748            }
749        };
750
751        match result {
752            ScopeInterruptResult::Op(result) => result,
753            ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
754                self.deadlines.pop();
755                self.cancel_spawned_tasks();
756                Err(Self::deadline_exceeded_error())
757            }
758            ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
759                Err(Self::interrupt_handler_timeout_error())
760            }
761            ScopeInterruptResult::CancelTimedOut => {
762                self.cancel_spawned_tasks();
763                let signal = self
764                    .take_host_interrupt_signal()
765                    .unwrap_or_else(|| "SIGINT".to_string());
766                if self.has_interrupt_handler_for(&signal) {
767                    self.dispatch_interrupt_handlers(&signal).await?;
768                }
769                Err(Self::cancelled_error())
770            }
771        }
772    }
773
774    pub(crate) fn deadline_exceeded_error() -> VmError {
775        VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
776    }
777
778    pub(crate) fn cancelled_error() -> VmError {
779        VmError::Thrown(VmValue::String(Rc::from(
780            "kind:cancelled:VM cancelled by host",
781        )))
782    }
783
784    /// Capture the current call stack as (fn_name, line, col, source_file) tuples.
785    pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
786        self.frames
787            .iter()
788            .map(|f| {
789                let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
790                let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
791                let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
792                (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
793            })
794            .collect()
795    }
796
797    /// Enrich a VmError with source line information from the captured stack
798    /// trace. Appends ` (line N)` to error variants whose messages don't
799    /// already carry location context.
800    pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
801        // Determine the line from the captured stack trace (innermost frame).
802        let line = self
803            .error_stack_trace
804            .last()
805            .map(|(_, l, _, _)| *l)
806            .unwrap_or_else(|| self.current_line());
807        if line == 0 {
808            return error;
809        }
810        let suffix = format!(" (line {line})");
811        match error {
812            VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
813            VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
814            VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
815            VmError::UndefinedVariable(name) => {
816                VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
817            }
818            VmError::UndefinedBuiltin(name) => {
819                VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
820            }
821            VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
822                "Cannot assign to immutable binding: {name}{suffix}"
823            )),
824            VmError::StackOverflow => {
825                VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
826            }
827            // Leave these untouched:
828            // - Thrown: user-thrown errors should not be silently modified
829            // - CategorizedError: structured errors for agent orchestration
830            // - Return: control flow, not a real error
831            // - StackUnderflow / InvalidInstruction: internal VM bugs
832            other => other,
833        }
834    }
835}