Skip to main content

harn_vm/vm/
execution.rs

1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef, Op};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13    Scope,
14    InterruptHandler,
15}
16
17impl Vm {
18    /// Returns true when no scope-level async machinery is armed. The hot
19    /// interpreter loop uses this to skip both the `pending_scope_interrupt`
20    /// future and the `execute_op_with_scope_interrupts` `tokio::select!`
21    /// wrapper on every dispatch — both are necessary for cancellable /
22    /// deadlined VMs but pure overhead in the common case (benchmarks,
23    /// background script execution, etc.).
24    #[inline]
25    pub(crate) fn scope_interrupts_clean(&self) -> bool {
26        self.cancel_token.is_none()
27            && self.interrupt_signal_token.is_none()
28            && self.pending_interrupt_signal.is_none()
29            && self.interrupt_handler_deadline.is_none()
30            && self.deadlines.is_empty()
31    }
32
33    /// Execute a compiled chunk.
34    pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
35        let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
36        let result = self.run_chunk(chunk).await;
37        let result = match result {
38            Ok(value) => self.run_pipeline_finish_lifecycle(value).await,
39            Err(error) => {
40                crate::orchestration::clear_pipeline_on_finish();
41                Err(error)
42            }
43        };
44        crate::tracing::span_end(span_id);
45        result
46    }
47
48    /// Run the pipeline-finish lifecycle: `PreFinish`, optional
49    /// `OnUnsettledDetected`, the `on_finish` callback, `PostFinish`. The
50    /// callback (if registered) may transform the return value; everything
51    /// else is advisory.
52    ///
53    /// Tracked: <https://github.com/burin-labs/harn/issues/1854>.
54    async fn run_pipeline_finish_lifecycle(&mut self, value: VmValue) -> Result<VmValue, VmError> {
55        use crate::orchestration::{
56            take_pipeline_on_finish, unsettled_state_snapshot_async, HookEvent,
57        };
58        let _tape_phase =
59            crate::testbench::tape::enter_phase(crate::testbench::tape::TapePhase::RuntimeFinalize);
60
61        let on_finish = take_pipeline_on_finish();
62        let unsettled = unsettled_state_snapshot_async().await;
63
64        let pre_payload = serde_json::json!({
65            "event": HookEvent::PreFinish.as_str(),
66            "return_value": crate::llm::vm_value_to_json(&value),
67            "unsettled": unsettled.to_json(),
68            "has_on_finish": on_finish.is_some(),
69        });
70        self.fire_finish_lifecycle_event(HookEvent::PreFinish, &pre_payload)
71            .await?;
72
73        if !unsettled.is_empty() {
74            let payload = serde_json::json!({
75                "event": HookEvent::OnUnsettledDetected.as_str(),
76                "unsettled": unsettled.to_json(),
77            });
78            self.fire_finish_lifecycle_event(HookEvent::OnUnsettledDetected, &payload)
79                .await?;
80        }
81
82        let final_value = if let Some(closure) = on_finish {
83            let harness_value = crate::harness::Harness::real().into_vm_value();
84            self.call_closure_pub(&closure, &[harness_value, value])
85                .await?
86        } else {
87            value
88        };
89
90        let post_payload = serde_json::json!({
91            "event": HookEvent::PostFinish.as_str(),
92            "return_value": crate::llm::vm_value_to_json(&final_value),
93            "unsettled": unsettled.to_json(),
94        });
95        self.fire_finish_lifecycle_event(HookEvent::PostFinish, &post_payload)
96            .await?;
97
98        Ok(final_value)
99    }
100
101    /// Dispatch a pipeline-finish lifecycle event by invoking matching
102    /// hook closures directly on `self`. The shared `run_lifecycle_hooks`
103    /// path clones a fresh child VM per call and discards its stdout —
104    /// fine for the agent-loop boundaries where hooks are advisory side-
105    /// channels, but the pipeline-finish boundary is the script's last
106    /// chance to print before `vm.output()` is captured, so the closures
107    /// run on `self` to keep their output visible.
108    ///
109    /// Honors the lifecycle control contract (harn#1859):
110    ///   * `PreFinish` rejects `Block` outright — surfaces a runtime
111    ///     error pointing the user at `OnFinish.block_until_settled`.
112    ///     `PostFinish` ignores any control return (advisory only).
113    ///   * `OnUnsettledDetected` honors `Block` to abort the finish
114    ///     lifecycle until the unsettled work clears.
115    ///   * Modify returns are recorded but not consumed at this boundary
116    ///     (the dispatcher already replays subsequent hooks with the
117    ///     post-modify payload via `run_lifecycle_hooks_with_control`).
118    async fn fire_finish_lifecycle_event(
119        &mut self,
120        event: crate::orchestration::HookEvent,
121        payload: &serde_json::Value,
122    ) -> Result<(), VmError> {
123        use crate::orchestration::{HookControl, HookEvent};
124        let invocations = crate::orchestration::matching_vm_lifecycle_hooks(event, payload);
125        if invocations.is_empty() {
126            return Ok(());
127        }
128        let mut current_payload = payload.clone();
129        for invocation in invocations {
130            let arg = crate::stdlib::json_to_vm_value(&current_payload);
131            let raw = self.call_closure_pub(&invocation.closure, &[arg]).await?;
132            let (action, effects) = crate::orchestration::collect_hook_effects_and_action(
133                event,
134                raw,
135                crate::value::VmValue::Nil,
136            )?;
137            crate::orchestration::inject_hook_effects_into_current_session(effects)?;
138            let control = crate::orchestration::parse_hook_control_for_finish(event, &action)?;
139            match control {
140                HookControl::Allow => {}
141                HookControl::Block { reason } => {
142                    if matches!(event, HookEvent::PreFinish) {
143                        return Err(VmError::Runtime(format!(
144                            "PreFinish hook returned block, which is not a valid control: {reason}. \
145                             To delay pipeline finish until unsettled work clears, use \
146                             OnFinish.block_until_settled (std/lifecycle) or return Modify/Allow \
147                             from PreFinish."
148                        )));
149                    }
150                    if matches!(event, HookEvent::PostFinish) {
151                        // Advisory only; ignore block returns from PostFinish.
152                        continue;
153                    }
154                    // OnUnsettledDetected: block aborts the finish lifecycle.
155                    return Err(VmError::Runtime(format!(
156                        "{} hook blocked pipeline finish: {reason}",
157                        event.as_str()
158                    )));
159                }
160                HookControl::Modify { payload: modified } => {
161                    current_payload = modified;
162                }
163                HookControl::Decision { .. } => {}
164            }
165        }
166        Ok(())
167    }
168
169    /// Convert a VmError into either a handled exception (returning Ok) or a propagated error.
170    pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
171        let thrown_value = match &error {
172            VmError::Thrown(v) => v.clone(),
173            other => VmValue::String(Rc::from(other.to_string())),
174        };
175
176        if let Some(handler) = self.exception_handlers.pop() {
177            if let Some(error_type) = handler.error_type.as_deref() {
178                // Typed catch: only match when the thrown enum's type equals the declared type.
179                let matches = match &thrown_value {
180                    VmValue::EnumVariant(enum_variant) => enum_variant.has_enum_name(error_type),
181                    _ => false,
182                };
183                if !matches {
184                    return self.handle_error(error);
185                }
186            }
187
188            self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
189
190            while self.frames.len() > handler.frame_depth {
191                if let Some(frame) = self.frames.pop() {
192                    if let Some(ref dir) = frame.saved_source_dir {
193                        crate::stdlib::set_thread_source_dir(dir);
194                    }
195                    self.iterators.truncate(frame.saved_iterator_depth);
196                    self.env = frame.saved_env;
197                }
198            }
199            crate::step_runtime::prune_below_frame(self.frames.len());
200
201            // Drop deadlines that belonged to unwound frames.
202            while self
203                .deadlines
204                .last()
205                .is_some_and(|d| d.1 > handler.frame_depth)
206            {
207                self.deadlines.pop();
208            }
209
210            self.env.truncate_scopes(handler.env_scope_depth);
211
212            self.stack.truncate(handler.stack_depth);
213            self.stack.push(thrown_value);
214
215            if let Some(frame) = self.frames.last_mut() {
216                frame.ip = handler.catch_ip;
217            }
218
219            Ok(None)
220        } else {
221            Err(error)
222        }
223    }
224
225    pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
226        self.run_chunk_entry(chunk, 0, None, None, None, None).await
227    }
228
229    pub(crate) async fn run_chunk_entry(
230        &mut self,
231        chunk: &Chunk,
232        argc: usize,
233        saved_source_dir: Option<std::path::PathBuf>,
234        module_functions: Option<ModuleFunctionRegistry>,
235        module_state: Option<crate::value::ModuleState>,
236        local_slots: Option<Vec<LocalSlot>>,
237    ) -> Result<VmValue, VmError> {
238        self.run_chunk_ref(
239            Rc::new(chunk.clone()),
240            argc,
241            saved_source_dir,
242            module_functions,
243            module_state,
244            local_slots,
245        )
246        .await
247    }
248
249    pub(crate) async fn run_chunk_ref(
250        &mut self,
251        chunk: ChunkRef,
252        argc: usize,
253        saved_source_dir: Option<std::path::PathBuf>,
254        module_functions: Option<ModuleFunctionRegistry>,
255        module_state: Option<crate::value::ModuleState>,
256        local_slots: Option<Vec<LocalSlot>>,
257    ) -> Result<VmValue, VmError> {
258        let debugger = self.debugger_attached();
259        let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
260        let initial_env = if debugger {
261            Some(self.env.clone())
262        } else {
263            None
264        };
265        let initial_local_slots = if debugger {
266            Some(local_slots.clone())
267        } else {
268            None
269        };
270        self.frames.push(CallFrame {
271            chunk,
272            ip: 0,
273            stack_base: self.stack.len(),
274            saved_env: self.env.clone(),
275            initial_env,
276            initial_local_slots,
277            saved_iterator_depth: self.iterators.len(),
278            fn_name: String::new(),
279            argc,
280            saved_source_dir,
281            module_functions,
282            module_state,
283            local_slots,
284            local_scope_base: self.env.scope_depth().saturating_sub(1),
285            local_scope_depth: 0,
286        });
287
288        self.drive_dispatch_loop(0, false).await
289    }
290
291    /// Sub-execution entrypoint used by [`Vm::call_closure`]: runs the
292    /// dispatch loop until the topmost frame pops back to `target_depth`,
293    /// restoring env/iterators/stack on that final pop so the caller's
294    /// state is intact. Distinct from the entrypoint-mode call in
295    /// [`Vm::run_chunk_ref`] (which preserves the script's top-level scope
296    /// for the module-init capture in `modules.rs`).
297    pub(crate) async fn drive_until_frame_depth(
298        &mut self,
299        target_depth: usize,
300    ) -> Result<VmValue, VmError> {
301        self.drive_dispatch_loop(target_depth, true).await
302    }
303
304    /// Dispatch loop body, parameterized on a target frame depth at which
305    /// the loop should return and whether to restore the caller's
306    /// env/iterators/stack on the final pop.
307    ///
308    /// `restore_on_final_pop = false` is the entrypoint mode used by
309    /// `run_chunk_ref` (leaves the script's top-level state in place so the
310    /// caller can capture it — see `modules.rs`).
311    ///
312    /// `restore_on_final_pop = true` is the sub-execution mode used by
313    /// `call_closure`: the closure's frame is pushed onto the caller's
314    /// frame stack and the loop drains it back to `target_depth`, so the
315    /// per-invocation `Box::pin` heap allocation a recursive async
316    /// `call_closure` would require is avoided.
317    async fn drive_dispatch_loop(
318        &mut self,
319        target_depth: usize,
320        restore_on_final_pop: bool,
321    ) -> Result<VmValue, VmError> {
322        loop {
323            // Slow path only: the interrupt-handler future, deadline check,
324            // and host-signal poll inside `pending_scope_interrupt` are all
325            // no-ops when no cancel/interrupt/deadline machinery is armed
326            // (the common case for unsupervised execution), so guard them
327            // with a sync check that avoids the per-iteration future
328            // state-machine allocation.
329            if !self.scope_interrupts_clean() {
330                if let Some(err) = self.pending_scope_interrupt().await {
331                    match self.handle_error(err) {
332                        Ok(None) => continue,
333                        Ok(Some(val)) => return Ok(val),
334                        Err(e) => {
335                            self.unwind_frames_to_depth(target_depth);
336                            return Err(e);
337                        }
338                    }
339                }
340            }
341
342            let frame = match self.frames.last_mut() {
343                Some(f) => f,
344                None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
345            };
346
347            if frame.ip >= frame.chunk.code.len() {
348                let val = self.stack.pop().unwrap_or(VmValue::Nil);
349                let val = self.run_step_post_hooks_for_current_frame(val).await?;
350                self.release_sync_guards_for_frame(self.frames.len());
351                let popped_frame = self.frames.pop().unwrap();
352                if let Some(ref dir) = popped_frame.saved_source_dir {
353                    crate::stdlib::set_thread_source_dir(dir);
354                }
355                let current_depth = self.frames.len();
356                crate::step_runtime::prune_below_frame(current_depth);
357                // Drop any deadlines owned by the popped frame so the
358                // caller doesn't inherit them (an early `return` from
359                // inside `deadline(d) { ... }` would otherwise leave the
360                // deadline live across the function boundary).
361                while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
362                    self.deadlines.pop();
363                }
364
365                let reached_target = current_depth <= target_depth;
366                if reached_target && !restore_on_final_pop {
367                    // Entrypoint mode: leave env / iterators / stack in place
368                    // so the caller can observe the script's top-level scope.
369                    return Ok(val);
370                }
371                self.iterators.truncate(popped_frame.saved_iterator_depth);
372                self.env = popped_frame.saved_env;
373                self.stack.truncate(popped_frame.stack_base);
374                if reached_target {
375                    return Ok(val);
376                }
377                self.stack.push(val);
378                continue;
379            }
380
381            let op_byte = frame.chunk.code[frame.ip];
382            frame.ip += 1;
383
384            // Sync/async split dispatch: skip the `execute_op` async fn
385            // (a per-iteration `Future` state machine over ~80 match arms)
386            // and the `tokio::select!` deadline/cancel wrapper whenever
387            // the VM has no interrupt machinery armed. Only call/iter/
388            // pipe/parallel/import/yield opcodes still need an `.await`.
389            let op_result: Result<(), VmError> = if self.scope_interrupts_clean() {
390                let op = match Op::from_byte(op_byte) {
391                    Some(op) => op,
392                    None => return Err(VmError::InvalidInstruction(op_byte)),
393                };
394                if let Some(result) = self.execute_op_sync(op) {
395                    result
396                } else {
397                    self.execute_op_async(op).await
398                }
399            } else {
400                match self.execute_op_with_scope_interrupts(op_byte).await {
401                    Ok(Some(val)) => return Ok(val),
402                    Ok(None) => Ok(()),
403                    Err(e) => Err(e),
404                }
405            };
406
407            match op_result {
408                Ok(()) => continue,
409                Err(VmError::Return(val)) => {
410                    let val = self.run_step_post_hooks_for_current_frame(val).await?;
411                    if let Some(popped_frame) = self.frames.pop() {
412                        self.release_sync_guards_for_frame(self.frames.len() + 1);
413                        if let Some(ref dir) = popped_frame.saved_source_dir {
414                            crate::stdlib::set_thread_source_dir(dir);
415                        }
416                        let current_depth = self.frames.len();
417                        self.exception_handlers
418                            .retain(|h| h.frame_depth <= current_depth);
419                        crate::step_runtime::prune_below_frame(current_depth);
420                        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
421                            self.deadlines.pop();
422                        }
423
424                        let reached_target = current_depth <= target_depth;
425                        if reached_target && !restore_on_final_pop {
426                            return Ok(val);
427                        }
428                        self.iterators.truncate(popped_frame.saved_iterator_depth);
429                        self.env = popped_frame.saved_env;
430                        self.stack.truncate(popped_frame.stack_base);
431                        if reached_target {
432                            return Ok(val);
433                        }
434                        self.stack.push(val);
435                    } else {
436                        return Ok(val);
437                    }
438                }
439                Err(e) => {
440                    // Capture stack trace before error handling unwinds frames.
441                    if self.error_stack_trace.is_empty() {
442                        self.error_stack_trace = self.capture_stack_trace();
443                    }
444                    // Honor `@step(error_boundary: ...)` if a step-budget
445                    // exhaustion error is propagating out of the step's
446                    // own frame. `continue` swaps the throw for a Nil
447                    // return; `escalate` re-tags the error as a handoff
448                    // escalation and lets the existing exception
449                    // handlers route it.
450                    let e = match self.apply_step_error_boundary(e) {
451                        StepBoundaryOutcome::Returned(val) => {
452                            self.error_stack_trace.clear();
453                            if self.frames.len() <= target_depth {
454                                return Ok(val);
455                            }
456                            self.stack.push(val);
457                            continue;
458                        }
459                        StepBoundaryOutcome::Throw(err) => err,
460                    };
461                    match self.handle_error(e) {
462                        Ok(None) => {
463                            self.error_stack_trace.clear();
464                            continue;
465                        }
466                        Ok(Some(val)) => return Ok(val),
467                        Err(e) => {
468                            self.unwind_frames_to_depth(target_depth);
469                            return Err(self.enrich_error_with_line(e));
470                        }
471                    }
472                }
473            }
474        }
475    }
476
477    /// Pop frames until `self.frames.len() <= target_depth`, restoring env,
478    /// iterators, stack, source-dir thread-locals, and releasing per-frame
479    /// sync guards for each popped frame. Used by [`drive_until_frame_depth`]
480    /// on the error path so a closure sub-execution leaves caller-visible
481    /// state at the same depth it found when an unhandled error propagates
482    /// out.
483    fn unwind_frames_to_depth(&mut self, target_depth: usize) {
484        while self.frames.len() > target_depth {
485            let frame_depth = self.frames.len();
486            if let Some(frame) = self.frames.pop() {
487                self.release_sync_guards_for_frame(frame_depth);
488                if let Some(ref dir) = frame.saved_source_dir {
489                    crate::stdlib::set_thread_source_dir(dir);
490                }
491                self.iterators.truncate(frame.saved_iterator_depth);
492                self.env = frame.saved_env;
493                self.stack.truncate(frame.stack_base);
494            }
495        }
496        let current_depth = self.frames.len();
497        crate::step_runtime::prune_below_frame(current_depth);
498        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
499            self.deadlines.pop();
500        }
501    }
502
503    /// Inspect a thrown error against the topmost active step's
504    /// `error_boundary`. Called from the main step loop before
505    /// `handle_error` so that a step's own budget-exhaustion error can be
506    /// short-circuited (`continue`) or annotated (`escalate`) before the
507    /// generic try/catch machinery sees it.
508    pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
509        use crate::step_runtime;
510        if !step_runtime::is_step_budget_exhausted(&error) {
511            return StepBoundaryOutcome::Throw(error);
512        }
513        let Some(step_depth) = step_runtime::active_step_frame_depth() else {
514            return StepBoundaryOutcome::Throw(error);
515        };
516        // The step's frame is the topmost on the call stack iff its
517        // recorded frame_depth equals `frames.len()`. If the throw is
518        // coming from a deeper frame we let it bubble up — the boundary
519        // still applies later when the step's own frame is reached.
520        if step_depth != self.frames.len() {
521            return StepBoundaryOutcome::Throw(error);
522        }
523        let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
524            .unwrap_or(step_runtime::StepErrorBoundary::Fail);
525        match boundary {
526            step_runtime::StepErrorBoundary::Continue => {
527                // Mimic VmError::Return(Nil) for the step's frame: pop
528                // the frame, restore its env/iterators/stack, and feed a
529                // Nil return value back to the caller.
530                if let Some(popped) = self.frames.pop() {
531                    self.release_sync_guards_for_frame(self.frames.len() + 1);
532                    if let Some(ref dir) = popped.saved_source_dir {
533                        crate::stdlib::set_thread_source_dir(dir);
534                    }
535                    let current_depth = self.frames.len();
536                    self.exception_handlers
537                        .retain(|h| h.frame_depth <= current_depth);
538                    step_runtime::pop_and_record(
539                        current_depth + 1,
540                        "skipped",
541                        Some(step_runtime_error_message(&error)),
542                    );
543                    if self.frames.is_empty() {
544                        return StepBoundaryOutcome::Returned(VmValue::Nil);
545                    }
546                    self.iterators.truncate(popped.saved_iterator_depth);
547                    self.env = popped.saved_env;
548                    self.stack.truncate(popped.stack_base);
549                }
550                StepBoundaryOutcome::Returned(VmValue::Nil)
551            }
552            step_runtime::StepErrorBoundary::Escalate => {
553                let identity = step_runtime::with_active_step(|step| {
554                    (
555                        step.definition.name.clone(),
556                        step.definition.function.clone(),
557                    )
558                });
559                step_runtime::pop_and_record(
560                    step_depth,
561                    "escalated",
562                    Some(step_runtime_error_message(&error)),
563                );
564                let (step_name, function) = identity.unzip();
565                StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
566                    error,
567                    step_name.as_deref(),
568                    function.as_deref(),
569                ))
570            }
571            step_runtime::StepErrorBoundary::Fail => {
572                step_runtime::pop_and_record(
573                    step_depth,
574                    "failed",
575                    Some(step_runtime_error_message(&error)),
576                );
577                StepBoundaryOutcome::Throw(error)
578            }
579        }
580    }
581}
582
583fn next_deadline(
584    scope_deadline: Option<Instant>,
585    interrupt_handler_deadline: Option<Instant>,
586) -> (Option<Instant>, Option<DeadlineKind>) {
587    match (scope_deadline, interrupt_handler_deadline) {
588        (Some(scope), Some(interrupt)) if interrupt < scope => {
589            (Some(interrupt), Some(DeadlineKind::InterruptHandler))
590        }
591        (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
592        (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
593        (None, None) => (None, None),
594    }
595}
596
597fn step_runtime_error_message(error: &VmError) -> String {
598    match error {
599        VmError::Thrown(VmValue::Dict(dict)) => dict
600            .get("message")
601            .map(|v| v.display())
602            .unwrap_or_else(|| error.to_string()),
603        _ => error.to_string(),
604    }
605}
606
607pub(crate) enum StepBoundaryOutcome {
608    Returned(VmValue),
609    Throw(VmError),
610}
611
612impl crate::vm::Vm {
613    pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
614        if let Some(err) = self.pending_scope_interrupt().await {
615            match self.handle_error(err) {
616                Ok(None) => return Ok(None),
617                Ok(Some(val)) => return Ok(Some((val, false))),
618                Err(e) => return Err(e),
619            }
620        }
621
622        let frame = match self.frames.last_mut() {
623            Some(f) => f,
624            None => {
625                let val = self.stack.pop().unwrap_or(VmValue::Nil);
626                return Ok(Some((val, false)));
627            }
628        };
629
630        if frame.ip >= frame.chunk.code.len() {
631            let val = self.stack.pop().unwrap_or(VmValue::Nil);
632            self.release_sync_guards_for_frame(self.frames.len());
633            let popped_frame = self.frames.pop().unwrap();
634            if self.frames.is_empty() {
635                return Ok(Some((val, false)));
636            }
637            self.iterators.truncate(popped_frame.saved_iterator_depth);
638            self.env = popped_frame.saved_env;
639            self.stack.truncate(popped_frame.stack_base);
640            self.stack.push(val);
641            return Ok(None);
642        }
643
644        let op = frame.chunk.code[frame.ip];
645        frame.ip += 1;
646
647        match self.execute_op_with_scope_interrupts(op).await {
648            Ok(Some(val)) => Ok(Some((val, false))),
649            Ok(None) => Ok(None),
650            Err(VmError::Return(val)) => {
651                if let Some(popped_frame) = self.frames.pop() {
652                    self.release_sync_guards_for_frame(self.frames.len() + 1);
653                    if let Some(ref dir) = popped_frame.saved_source_dir {
654                        crate::stdlib::set_thread_source_dir(dir);
655                    }
656                    let current_depth = self.frames.len();
657                    self.exception_handlers
658                        .retain(|h| h.frame_depth <= current_depth);
659                    if self.frames.is_empty() {
660                        return Ok(Some((val, false)));
661                    }
662                    self.iterators.truncate(popped_frame.saved_iterator_depth);
663                    self.env = popped_frame.saved_env;
664                    self.stack.truncate(popped_frame.stack_base);
665                    self.stack.push(val);
666                    Ok(None)
667                } else {
668                    Ok(Some((val, false)))
669                }
670            }
671            Err(e) => {
672                if self.error_stack_trace.is_empty() {
673                    self.error_stack_trace = self.capture_stack_trace();
674                }
675                match self.handle_error(e) {
676                    Ok(None) => {
677                        self.error_stack_trace.clear();
678                        Ok(None)
679                    }
680                    Ok(Some(val)) => Ok(Some((val, false))),
681                    Err(e) => Err(self.enrich_error_with_line(e)),
682                }
683            }
684        }
685    }
686
687    async fn execute_op_with_scope_interrupts(
688        &mut self,
689        op: u8,
690    ) -> Result<Option<VmValue>, VmError> {
691        enum ScopeInterruptResult {
692            Op(Result<Option<VmValue>, VmError>),
693            Deadline(DeadlineKind),
694            CancelTimedOut,
695        }
696
697        let (deadline, deadline_kind) = next_deadline(
698            self.deadlines.last().map(|(deadline, _)| *deadline),
699            self.interrupt_handler_deadline,
700        );
701        let cancel_token = self.cancel_token.clone();
702
703        if deadline.is_none() && cancel_token.is_none() {
704            return self.execute_op(op).await;
705        }
706
707        let has_deadline = deadline.is_some();
708        let cancel_requested_at_start = cancel_token
709            .as_ref()
710            .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
711        let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
712        let deadline_sleep = async move {
713            if let Some(deadline) = deadline {
714                tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
715            } else {
716                std::future::pending::<()>().await;
717            }
718        };
719        let cancel_sleep = async move {
720            if let Some(token) = cancel_token {
721                while !token.load(std::sync::atomic::Ordering::SeqCst) {
722                    tokio::time::sleep(Duration::from_millis(10)).await;
723                }
724            } else {
725                std::future::pending::<()>().await;
726            }
727        };
728
729        let result = {
730            let op_future = self.execute_op(op);
731            tokio::pin!(op_future);
732            tokio::select! {
733                result = &mut op_future => ScopeInterruptResult::Op(result),
734                _ = deadline_sleep, if has_deadline => {
735                    ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
736                },
737                _ = cancel_sleep, if has_cancel => {
738                    let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
739                    tokio::pin!(grace);
740                    tokio::select! {
741                        result = &mut op_future => ScopeInterruptResult::Op(result),
742                        _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
743                    }
744                }
745            }
746        };
747
748        match result {
749            ScopeInterruptResult::Op(result) => result,
750            ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
751                self.deadlines.pop();
752                self.cancel_spawned_tasks();
753                Err(Self::deadline_exceeded_error())
754            }
755            ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
756                Err(Self::interrupt_handler_timeout_error())
757            }
758            ScopeInterruptResult::CancelTimedOut => {
759                self.cancel_spawned_tasks();
760                let signal = self
761                    .take_host_interrupt_signal()
762                    .unwrap_or_else(|| "SIGINT".to_string());
763                if self.has_interrupt_handler_for(&signal) {
764                    self.dispatch_interrupt_handlers(&signal).await?;
765                }
766                Err(Self::cancelled_error())
767            }
768        }
769    }
770
771    pub(crate) fn deadline_exceeded_error() -> VmError {
772        VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
773    }
774
775    pub(crate) fn cancelled_error() -> VmError {
776        VmError::Thrown(VmValue::String(Rc::from(
777            "kind:cancelled:VM cancelled by host",
778        )))
779    }
780
781    /// Capture the current call stack as (fn_name, line, col, source_file) tuples.
782    pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
783        self.frames
784            .iter()
785            .map(|f| {
786                let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
787                let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
788                let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
789                (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
790            })
791            .collect()
792    }
793
794    /// Enrich a VmError with source line information from the captured stack
795    /// trace. Appends ` (line N)` to error variants whose messages don't
796    /// already carry location context.
797    pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
798        // Determine the line from the captured stack trace (innermost frame).
799        let line = self
800            .error_stack_trace
801            .last()
802            .map(|(_, l, _, _)| *l)
803            .unwrap_or_else(|| self.current_line());
804        if line == 0 {
805            return error;
806        }
807        let suffix = format!(" (line {line})");
808        match error {
809            VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
810            VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
811            VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
812            VmError::UndefinedVariable(name) => {
813                VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
814            }
815            VmError::UndefinedBuiltin(name) => {
816                VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
817            }
818            VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
819                "Cannot assign to immutable binding: {name}{suffix}"
820            )),
821            VmError::StackOverflow => {
822                VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
823            }
824            // Leave these untouched:
825            // - Thrown: user-thrown errors should not be silently modified
826            // - CategorizedError: structured errors for agent orchestration
827            // - Return: control flow, not a real error
828            // - StackUnderflow / InvalidInstruction: internal VM bugs
829            other => other,
830        }
831    }
832}