Skip to main content

harn_vm/vm/
execution.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef, Op};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13    Scope,
14    InterruptHandler,
15}
16
17impl Vm {
18    /// Returns true when no scope-level async machinery is armed. The hot
19    /// interpreter loop uses this to skip both the `pending_scope_interrupt`
20    /// future and the `execute_op_with_scope_interrupts` `tokio::select!`
21    /// wrapper on every dispatch — both are necessary for cancellable /
22    /// deadlined VMs but pure overhead in the common case (benchmarks,
23    /// background script execution, etc.).
24    #[inline]
25    pub(crate) fn scope_interrupts_clean(&self) -> bool {
26        self.cancel_token.is_none()
27            && self.interrupt_signal_token.is_none()
28            && self.pending_interrupt_signal.is_none()
29            && self.interrupt_handler_deadline.is_none()
30            && self.deadlines.is_empty()
31    }
32
33    /// Execute a compiled chunk.
34    ///
35    /// Convenience entry point for callers that hold a borrowed [`Chunk`] and
36    /// run it once (tests, one-shot CLI invocations). It clones the chunk once
37    /// to obtain the owned [`ChunkRef`] the call frame requires. Callers that
38    /// re-run the same compiled chunk (servers, record filters, triggers)
39    /// should hold a [`ChunkRef`] and call [`Vm::execute_arc`] to skip the
40    /// per-execution deep copy of the bytecode + constant pool.
41    pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
42        self.execute_arc(Arc::new(chunk.clone())).await
43    }
44
45    /// Execute a shared compiled chunk without cloning its bytecode.
46    ///
47    /// Threads the existing [`ChunkRef`] straight into the call frame, so
48    /// re-running the same chunk is a refcount bump rather than an
49    /// `O(code + constants)` copy.
50    pub async fn execute_arc(&mut self, chunk: ChunkRef) -> Result<VmValue, VmError> {
51        let registry = self.pool_registry.clone();
52        crate::stdlib::pool::with_pool_registry_scope(registry, async {
53            self.execute_scoped(chunk).await
54        })
55        .await
56    }
57
58    async fn execute_scoped(&mut self, chunk: ChunkRef) -> Result<VmValue, VmError> {
59        let _execution_activity = self
60            .wait_for_graph
61            .register_task(self.runtime_context.task_id.clone());
62        let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
63        let result = self.run_chunk(chunk).await;
64        let result = match result {
65            Ok(value) => self.run_pipeline_finish_lifecycle(value).await,
66            Err(error) => {
67                crate::orchestration::clear_pipeline_on_finish();
68                Err(error)
69            }
70        };
71        crate::tracing::span_end(span_id);
72        result
73    }
74
75    /// Run the pipeline-finish lifecycle: `PreFinish`, optional
76    /// `OnUnsettledDetected`, the `on_finish` callback, `PostFinish`. The
77    /// callback (if registered) may transform the return value; everything
78    /// else is advisory.
79    ///
80    /// Tracked: <https://github.com/burin-labs/harn/issues/1854>.
81    async fn run_pipeline_finish_lifecycle(&mut self, value: VmValue) -> Result<VmValue, VmError> {
82        use crate::orchestration::{
83            take_pipeline_on_finish, unsettled_state_snapshot_async, HookEvent,
84        };
85        let _tape_phase =
86            crate::testbench::tape::enter_phase(crate::testbench::tape::TapePhase::RuntimeFinalize);
87
88        let on_finish = take_pipeline_on_finish();
89        let unsettled = unsettled_state_snapshot_async().await;
90
91        let pre_payload = serde_json::json!({
92            "event": HookEvent::PreFinish.as_str(),
93            "return_value": crate::llm::vm_value_to_json(&value),
94            "unsettled": unsettled.to_json(),
95            "has_on_finish": on_finish.is_some(),
96        });
97        self.fire_finish_lifecycle_event(HookEvent::PreFinish, &pre_payload)
98            .await?;
99
100        if !unsettled.is_empty() {
101            let payload = serde_json::json!({
102                "event": HookEvent::OnUnsettledDetected.as_str(),
103                "unsettled": unsettled.to_json(),
104            });
105            self.fire_finish_lifecycle_event(HookEvent::OnUnsettledDetected, &payload)
106                .await?;
107        }
108
109        let final_value = if let Some(closure) = on_finish {
110            let harness_value = crate::harness::Harness::real().into_vm_value();
111            self.call_closure_pub(&closure, &[harness_value, value])
112                .await?
113        } else {
114            value
115        };
116
117        let post_payload = serde_json::json!({
118            "event": HookEvent::PostFinish.as_str(),
119            "return_value": crate::llm::vm_value_to_json(&final_value),
120            "unsettled": unsettled.to_json(),
121        });
122        self.fire_finish_lifecycle_event(HookEvent::PostFinish, &post_payload)
123            .await?;
124
125        Ok(final_value)
126    }
127
128    /// Dispatch a pipeline-finish lifecycle event by invoking matching
129    /// hook closures directly on `self`. The shared `run_lifecycle_hooks`
130    /// path clones a fresh child VM per call and discards its stdout —
131    /// fine for the agent-loop boundaries where hooks are advisory side-
132    /// channels, but the pipeline-finish boundary is the script's last
133    /// chance to print before `vm.output()` is captured, so the closures
134    /// run on `self` to keep their output visible.
135    ///
136    /// Honors the lifecycle control contract (harn#1859):
137    ///   * `PreFinish` rejects `Block` outright — surfaces a runtime
138    ///     error pointing the user at `OnFinish.block_until_settled`.
139    ///     `PostFinish` ignores any control return (advisory only).
140    ///   * `OnUnsettledDetected` honors `Block` to abort the finish
141    ///     lifecycle until the unsettled work clears.
142    ///   * Modify returns are recorded but not consumed at this boundary
143    ///     (the dispatcher already replays subsequent hooks with the
144    ///     post-modify payload via `run_lifecycle_hooks_with_control`).
145    async fn fire_finish_lifecycle_event(
146        &mut self,
147        event: crate::orchestration::HookEvent,
148        payload: &serde_json::Value,
149    ) -> Result<(), VmError> {
150        use crate::orchestration::{HookControl, HookEvent};
151        let invocations = crate::orchestration::matching_vm_lifecycle_hooks(event, payload);
152        if invocations.is_empty() {
153            return Ok(());
154        }
155        let mut current_payload = payload.clone();
156        for invocation in invocations {
157            let arg = crate::stdlib::json_to_vm_value(&current_payload);
158            let closure = invocation.resolve(self).await?;
159            let raw = self.call_closure_pub(&closure, &[arg]).await?;
160            let (action, effects) = crate::orchestration::collect_hook_effects_and_action(
161                event,
162                raw,
163                crate::value::VmValue::Nil,
164            )?;
165            crate::orchestration::inject_hook_effects_into_current_session(effects)?;
166            let control = crate::orchestration::parse_hook_control_for_finish(event, &action)?;
167            match control {
168                HookControl::Allow => {}
169                HookControl::Block { reason } => {
170                    if matches!(event, HookEvent::PreFinish) {
171                        return Err(VmError::Runtime(format!(
172                            "PreFinish hook returned block, which is not a valid control: {reason}. \
173                             To delay pipeline finish until unsettled work clears, use \
174                             OnFinish.block_until_settled (std/lifecycle) or return Modify/Allow \
175                             from PreFinish."
176                        )));
177                    }
178                    if matches!(event, HookEvent::PostFinish) {
179                        // Advisory only; ignore block returns from PostFinish.
180                        continue;
181                    }
182                    // OnUnsettledDetected: block aborts the finish lifecycle.
183                    return Err(VmError::Runtime(format!(
184                        "{} hook blocked pipeline finish: {reason}",
185                        event.as_str()
186                    )));
187                }
188                HookControl::Modify { payload: modified } => {
189                    current_payload = modified;
190                }
191                HookControl::Decision { .. } => {}
192            }
193        }
194        Ok(())
195    }
196
197    /// Convert a VmError into either a handled exception (returning Ok) or a propagated error.
198    pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
199        let thrown_value = error.thrown_value();
200
201        if let Some(handler) = self.exception_handlers.pop() {
202            if let Some(error_type) = handler.error_type.as_deref() {
203                // Typed catch: only match when the thrown enum's type equals the declared type.
204                let matches = match &thrown_value {
205                    VmValue::EnumVariant(enum_variant) => enum_variant.has_enum_name(error_type),
206                    _ => false,
207                };
208                if !matches {
209                    return self.handle_error(error);
210                }
211            }
212
213            self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
214
215            while self.frames.len() > handler.frame_depth {
216                if let Some(frame) = self.frames.pop() {
217                    if let Some(ref dir) = frame.saved_source_dir {
218                        crate::stdlib::set_thread_source_dir(dir);
219                    }
220                    self.iterators.truncate(frame.saved_iterator_depth);
221                    self.env = frame.saved_env;
222                }
223            }
224            crate::step_runtime::prune_below_frame(self.frames.len());
225
226            // Drop deadlines that belonged to unwound frames.
227            while self
228                .deadlines
229                .last()
230                .is_some_and(|d| d.1 > handler.frame_depth)
231            {
232                self.deadlines.pop();
233            }
234
235            self.env.truncate_scopes(handler.env_scope_depth);
236
237            self.stack.truncate(handler.stack_depth);
238            self.stack.push(thrown_value);
239
240            if let Some(frame) = self.frames.last_mut() {
241                frame.ip = handler.catch_ip;
242            }
243
244            Ok(None)
245        } else {
246            Err(error)
247        }
248    }
249
250    pub(crate) async fn run_chunk(&mut self, chunk: ChunkRef) -> Result<VmValue, VmError> {
251        self.run_chunk_ref(chunk, 0, None, None, None, None).await
252    }
253
254    pub(crate) async fn run_chunk_ref(
255        &mut self,
256        chunk: ChunkRef,
257        argc: usize,
258        saved_source_dir: Option<std::path::PathBuf>,
259        module_functions: Option<ModuleFunctionRegistry>,
260        module_state: Option<crate::value::ModuleState>,
261        local_slots: Option<Vec<LocalSlot>>,
262    ) -> Result<VmValue, VmError> {
263        let debugger = self.debugger_attached();
264        let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
265        let initial_env = if debugger {
266            Some(self.env.clone())
267        } else {
268            None
269        };
270        let initial_local_slots = if debugger {
271            Some(local_slots.clone())
272        } else {
273            None
274        };
275        let inline_cache_set = self.inline_cache_set_index_for_chunk(&chunk);
276        self.frames.push(CallFrame {
277            chunk,
278            inline_cache_set,
279            ip: 0,
280            stack_base: self.stack.len(),
281            saved_env: self.env.clone(),
282            initial_env,
283            initial_local_slots,
284            saved_iterator_depth: self.iterators.len(),
285            fn_name: String::new(),
286            argc,
287            saved_source_dir,
288            module_functions,
289            module_state,
290            local_slots,
291            local_scope_base: self.env.scope_depth().saturating_sub(1),
292            local_scope_depth: 0,
293        });
294
295        self.drive_dispatch_loop(0, false).await
296    }
297
298    /// Sub-execution entrypoint used by [`Vm::call_closure`]: runs the
299    /// dispatch loop until the topmost frame pops back to `target_depth`,
300    /// restoring env/iterators/stack on that final pop so the caller's
301    /// state is intact. Distinct from the entrypoint-mode call in
302    /// [`Vm::run_chunk_ref`] (which preserves the script's top-level scope
303    /// for the module-init capture in `modules.rs`).
304    pub(crate) async fn drive_until_frame_depth(
305        &mut self,
306        target_depth: usize,
307    ) -> Result<VmValue, VmError> {
308        self.drive_dispatch_loop(target_depth, true).await
309    }
310
311    /// Dispatch loop body, parameterized on a target frame depth at which
312    /// the loop should return and whether to restore the caller's
313    /// env/iterators/stack on the final pop.
314    ///
315    /// `restore_on_final_pop = false` is the entrypoint mode used by
316    /// `run_chunk_ref` (leaves the script's top-level state in place so the
317    /// caller can capture it — see `modules.rs`).
318    ///
319    /// `restore_on_final_pop = true` is the sub-execution mode used by
320    /// `call_closure`: the closure's frame is pushed onto the caller's
321    /// frame stack and the loop drains it back to `target_depth`, so the
322    /// per-invocation `Box::pin` heap allocation a recursive async
323    /// `call_closure` would require is avoided.
324    async fn drive_dispatch_loop(
325        &mut self,
326        target_depth: usize,
327        restore_on_final_pop: bool,
328    ) -> Result<VmValue, VmError> {
329        let _task_activity = self
330            .wait_for_graph
331            .register_task(self.runtime_context.task_id.clone());
332        loop {
333            // Slow path only: the interrupt-handler future, deadline check,
334            // and host-signal poll inside `pending_scope_interrupt` are all
335            // no-ops when no cancel/interrupt/deadline machinery is armed
336            // (the common case for unsupervised execution), so guard them
337            // with a sync check that avoids the per-iteration future
338            // state-machine allocation.
339            if !self.scope_interrupts_clean() {
340                if let Some(err) = self.pending_scope_interrupt().await {
341                    match self.handle_error(err) {
342                        Ok(None) => continue,
343                        Ok(Some(val)) => return Ok(val),
344                        Err(e) => {
345                            self.unwind_frames_to_depth(target_depth);
346                            return Err(e);
347                        }
348                    }
349                }
350            }
351
352            let frame = match self.frames.last_mut() {
353                Some(f) => f,
354                None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
355            };
356
357            if frame.ip >= frame.chunk.code.len() {
358                let val = self.stack.pop().unwrap_or(VmValue::Nil);
359                let val = self.run_step_post_hooks_for_current_frame(val).await?;
360                self.release_sync_guards_for_frame(self.frames.len());
361                let popped_frame = self.frames.pop().unwrap();
362                if let Some(ref dir) = popped_frame.saved_source_dir {
363                    crate::stdlib::set_thread_source_dir(dir);
364                }
365                let current_depth = self.frames.len();
366                crate::step_runtime::prune_below_frame(current_depth);
367                // Drop any deadlines owned by the popped frame so the
368                // caller doesn't inherit them (an early `return` from
369                // inside `deadline(d) { ... }` would otherwise leave the
370                // deadline live across the function boundary).
371                while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
372                    self.deadlines.pop();
373                }
374
375                let reached_target = current_depth <= target_depth;
376                if reached_target && !restore_on_final_pop {
377                    // Entrypoint mode: leave env / iterators / stack in place
378                    // so the caller can observe the script's top-level scope.
379                    return Ok(val);
380                }
381                self.iterators.truncate(popped_frame.saved_iterator_depth);
382                self.env = popped_frame.saved_env;
383                self.stack.truncate(popped_frame.stack_base);
384                if reached_target {
385                    return Ok(val);
386                }
387                self.stack.push(val);
388                continue;
389            }
390
391            let op_byte = frame.chunk.code[frame.ip];
392            // Line-coverage hit. `self.coverage` is `None` unless a coverage
393            // session is active, so this is a single predictable branch on the
394            // hot path (and a disjoint-field borrow from `frame`, which holds
395            // `self.frames`). `frame.ip` is still the index of the instruction
396            // we just read, before the increment below.
397            if let Some(coverage) = self.coverage.as_mut() {
398                coverage.record(&frame.chunk, frame.ip);
399            }
400            frame.ip += 1;
401
402            // Sync/async split dispatch: skip the `execute_op` async fn
403            // (a per-iteration `Future` state machine over ~80 match arms)
404            // and the `tokio::select!` deadline/cancel wrapper whenever
405            // the VM has no interrupt machinery armed. Only call/iter/
406            // pipe/parallel/import/yield opcodes still need an `.await`.
407            let op_result: Result<(), VmError> = if self.scope_interrupts_clean() {
408                let op = match Op::from_byte(op_byte) {
409                    Some(op) => op,
410                    None => return Err(VmError::InvalidInstruction(op_byte)),
411                };
412                if let Some(result) = self.execute_op_sync(op) {
413                    result
414                } else {
415                    self.execute_op_async(op).await
416                }
417            } else {
418                match self.execute_op_with_scope_interrupts(op_byte).await {
419                    Ok(Some(val)) => return Ok(val),
420                    Ok(None) => Ok(()),
421                    Err(e) => Err(e),
422                }
423            };
424
425            match op_result {
426                Ok(()) => continue,
427                Err(VmError::Return(val)) => {
428                    let val = self.run_step_post_hooks_for_current_frame(val).await?;
429                    if let Some(popped_frame) = self.frames.pop() {
430                        self.release_sync_guards_for_frame(self.frames.len() + 1);
431                        if let Some(ref dir) = popped_frame.saved_source_dir {
432                            crate::stdlib::set_thread_source_dir(dir);
433                        }
434                        let current_depth = self.frames.len();
435                        self.exception_handlers
436                            .retain(|h| h.frame_depth <= current_depth);
437                        crate::step_runtime::prune_below_frame(current_depth);
438                        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
439                            self.deadlines.pop();
440                        }
441
442                        let reached_target = current_depth <= target_depth;
443                        if reached_target && !restore_on_final_pop {
444                            return Ok(val);
445                        }
446                        self.iterators.truncate(popped_frame.saved_iterator_depth);
447                        self.env = popped_frame.saved_env;
448                        self.stack.truncate(popped_frame.stack_base);
449                        if reached_target {
450                            return Ok(val);
451                        }
452                        self.stack.push(val);
453                    } else {
454                        return Ok(val);
455                    }
456                }
457                Err(e) => {
458                    // Capture stack trace before error handling unwinds frames.
459                    if self.error_stack_trace.is_empty() {
460                        self.error_stack_trace = self.capture_stack_trace();
461                    }
462                    // Honor `@step(error_boundary: ...)` if a step-budget
463                    // exhaustion error is propagating out of the step's
464                    // own frame. `continue` swaps the throw for a Nil
465                    // return; `escalate` re-tags the error as a handoff
466                    // escalation and lets the existing exception
467                    // handlers route it.
468                    let e = match self.apply_step_error_boundary(e) {
469                        StepBoundaryOutcome::Returned(val) => {
470                            self.error_stack_trace.clear();
471                            if self.frames.len() <= target_depth {
472                                return Ok(val);
473                            }
474                            self.stack.push(val);
475                            continue;
476                        }
477                        StepBoundaryOutcome::Throw(err) => err,
478                    };
479                    match self.handle_error(e) {
480                        Ok(None) => {
481                            self.error_stack_trace.clear();
482                            continue;
483                        }
484                        Ok(Some(val)) => return Ok(val),
485                        Err(e) => {
486                            self.unwind_frames_to_depth(target_depth);
487                            return Err(self.enrich_error_with_line(e));
488                        }
489                    }
490                }
491            }
492        }
493    }
494
495    /// Pop frames until `self.frames.len() <= target_depth`, restoring env,
496    /// iterators, stack, source-dir thread-locals, and releasing per-frame
497    /// sync guards for each popped frame. Used by [`drive_until_frame_depth`]
498    /// on the error path so a closure sub-execution leaves caller-visible
499    /// state at the same depth it found when an unhandled error propagates
500    /// out.
501    fn unwind_frames_to_depth(&mut self, target_depth: usize) {
502        while self.frames.len() > target_depth {
503            let frame_depth = self.frames.len();
504            if let Some(frame) = self.frames.pop() {
505                self.release_sync_guards_for_frame(frame_depth);
506                if let Some(ref dir) = frame.saved_source_dir {
507                    crate::stdlib::set_thread_source_dir(dir);
508                }
509                self.iterators.truncate(frame.saved_iterator_depth);
510                self.env = frame.saved_env;
511                self.stack.truncate(frame.stack_base);
512            }
513        }
514        let current_depth = self.frames.len();
515        crate::step_runtime::prune_below_frame(current_depth);
516        while self.deadlines.last().is_some_and(|d| d.1 > current_depth) {
517            self.deadlines.pop();
518        }
519    }
520
521    /// Inspect a thrown error against the topmost active step's
522    /// `error_boundary`. Called from the main step loop before
523    /// `handle_error` so that a step's own budget-exhaustion error can be
524    /// short-circuited (`continue`) or annotated (`escalate`) before the
525    /// generic try/catch machinery sees it.
526    pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
527        use crate::step_runtime;
528        if !step_runtime::is_step_budget_exhausted(&error) {
529            return StepBoundaryOutcome::Throw(error);
530        }
531        let Some(step_depth) = step_runtime::active_step_frame_depth() else {
532            return StepBoundaryOutcome::Throw(error);
533        };
534        // The step's frame is the topmost on the call stack iff its
535        // recorded frame_depth equals `frames.len()`. If the throw is
536        // coming from a deeper frame we let it bubble up — the boundary
537        // still applies later when the step's own frame is reached.
538        if step_depth != self.frames.len() {
539            return StepBoundaryOutcome::Throw(error);
540        }
541        let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
542            .unwrap_or(step_runtime::StepErrorBoundary::Fail);
543        match boundary {
544            step_runtime::StepErrorBoundary::Continue => {
545                // Mimic VmError::Return(Nil) for the step's frame: pop
546                // the frame, restore its env/iterators/stack, and feed a
547                // Nil return value back to the caller.
548                if let Some(popped) = self.frames.pop() {
549                    self.release_sync_guards_for_frame(self.frames.len() + 1);
550                    if let Some(ref dir) = popped.saved_source_dir {
551                        crate::stdlib::set_thread_source_dir(dir);
552                    }
553                    let current_depth = self.frames.len();
554                    self.exception_handlers
555                        .retain(|h| h.frame_depth <= current_depth);
556                    step_runtime::pop_and_record(
557                        current_depth + 1,
558                        "skipped",
559                        Some(step_runtime_error_message(&error)),
560                    );
561                    if self.frames.is_empty() {
562                        return StepBoundaryOutcome::Returned(VmValue::Nil);
563                    }
564                    self.iterators.truncate(popped.saved_iterator_depth);
565                    self.env = popped.saved_env;
566                    self.stack.truncate(popped.stack_base);
567                }
568                StepBoundaryOutcome::Returned(VmValue::Nil)
569            }
570            step_runtime::StepErrorBoundary::Escalate => {
571                let identity = step_runtime::with_active_step(|step| {
572                    (
573                        step.definition.name.clone(),
574                        step.definition.function.clone(),
575                    )
576                });
577                step_runtime::pop_and_record(
578                    step_depth,
579                    "escalated",
580                    Some(step_runtime_error_message(&error)),
581                );
582                let (step_name, function) = identity.unzip();
583                StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
584                    error,
585                    step_name.as_deref(),
586                    function.as_deref(),
587                ))
588            }
589            step_runtime::StepErrorBoundary::Fail => {
590                step_runtime::pop_and_record(
591                    step_depth,
592                    "failed",
593                    Some(step_runtime_error_message(&error)),
594                );
595                StepBoundaryOutcome::Throw(error)
596            }
597        }
598    }
599}
600
601fn next_deadline(
602    scope_deadline: Option<Instant>,
603    interrupt_handler_deadline: Option<Instant>,
604) -> (Option<Instant>, Option<DeadlineKind>) {
605    match (scope_deadline, interrupt_handler_deadline) {
606        (Some(scope), Some(interrupt)) if interrupt < scope => {
607            (Some(interrupt), Some(DeadlineKind::InterruptHandler))
608        }
609        (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
610        (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
611        (None, None) => (None, None),
612    }
613}
614
615fn step_runtime_error_message(error: &VmError) -> String {
616    match error {
617        VmError::Thrown(VmValue::Dict(dict)) => dict
618            .get("message")
619            .map(|v| v.display())
620            .unwrap_or_else(|| error.to_string()),
621        _ => error.to_string(),
622    }
623}
624
625pub(crate) enum StepBoundaryOutcome {
626    Returned(VmValue),
627    Throw(VmError),
628}
629
630impl crate::vm::Vm {
631    pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
632        if let Some(err) = self.pending_scope_interrupt().await {
633            match self.handle_error(err) {
634                Ok(None) => return Ok(None),
635                Ok(Some(val)) => return Ok(Some((val, false))),
636                Err(e) => return Err(e),
637            }
638        }
639
640        let frame = match self.frames.last_mut() {
641            Some(f) => f,
642            None => {
643                let val = self.stack.pop().unwrap_or(VmValue::Nil);
644                return Ok(Some((val, false)));
645            }
646        };
647
648        if frame.ip >= frame.chunk.code.len() {
649            let val = self.stack.pop().unwrap_or(VmValue::Nil);
650            self.release_sync_guards_for_frame(self.frames.len());
651            let popped_frame = self.frames.pop().unwrap();
652            if self.frames.is_empty() {
653                return Ok(Some((val, false)));
654            }
655            self.iterators.truncate(popped_frame.saved_iterator_depth);
656            self.env = popped_frame.saved_env;
657            self.stack.truncate(popped_frame.stack_base);
658            self.stack.push(val);
659            return Ok(None);
660        }
661
662        let op = frame.chunk.code[frame.ip];
663        frame.ip += 1;
664
665        match self.execute_op_with_scope_interrupts(op).await {
666            Ok(Some(val)) => Ok(Some((val, false))),
667            Ok(None) => Ok(None),
668            Err(VmError::Return(val)) => {
669                if let Some(popped_frame) = self.frames.pop() {
670                    self.release_sync_guards_for_frame(self.frames.len() + 1);
671                    if let Some(ref dir) = popped_frame.saved_source_dir {
672                        crate::stdlib::set_thread_source_dir(dir);
673                    }
674                    let current_depth = self.frames.len();
675                    self.exception_handlers
676                        .retain(|h| h.frame_depth <= current_depth);
677                    if self.frames.is_empty() {
678                        return Ok(Some((val, false)));
679                    }
680                    self.iterators.truncate(popped_frame.saved_iterator_depth);
681                    self.env = popped_frame.saved_env;
682                    self.stack.truncate(popped_frame.stack_base);
683                    self.stack.push(val);
684                    Ok(None)
685                } else {
686                    Ok(Some((val, false)))
687                }
688            }
689            Err(e) => {
690                if self.error_stack_trace.is_empty() {
691                    self.error_stack_trace = self.capture_stack_trace();
692                }
693                match self.handle_error(e) {
694                    Ok(None) => {
695                        self.error_stack_trace.clear();
696                        Ok(None)
697                    }
698                    Ok(Some(val)) => Ok(Some((val, false))),
699                    Err(e) => Err(self.enrich_error_with_line(e)),
700                }
701            }
702        }
703    }
704
705    async fn execute_op_with_scope_interrupts(
706        &mut self,
707        op: u8,
708    ) -> Result<Option<VmValue>, VmError> {
709        enum ScopeInterruptResult {
710            Op(Result<Option<VmValue>, VmError>),
711            Deadline(DeadlineKind),
712            CancelTimedOut,
713        }
714
715        let (deadline, deadline_kind) = next_deadline(
716            self.deadlines.last().map(|(deadline, _)| *deadline),
717            self.interrupt_handler_deadline,
718        );
719        let cancel_token = self.cancel_token.clone();
720
721        if deadline.is_none() && cancel_token.is_none() {
722            return self.execute_op(op).await;
723        }
724
725        let has_deadline = deadline.is_some();
726        let cancel_requested_at_start = cancel_token
727            .as_ref()
728            .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
729        let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
730        let deadline_sleep = async move {
731            if let Some(deadline) = deadline {
732                tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
733            } else {
734                std::future::pending::<()>().await;
735            }
736        };
737        let cancel_sleep = async move {
738            if let Some(token) = cancel_token {
739                while !token.load(std::sync::atomic::Ordering::SeqCst) {
740                    tokio::time::sleep(Duration::from_millis(10)).await;
741                }
742            } else {
743                std::future::pending::<()>().await;
744            }
745        };
746
747        let result = {
748            let op_future = self.execute_op(op);
749            tokio::pin!(op_future);
750            tokio::select! {
751                result = &mut op_future => ScopeInterruptResult::Op(result),
752                _ = deadline_sleep, if has_deadline => {
753                    ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
754                },
755                _ = cancel_sleep, if has_cancel => {
756                    let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
757                    tokio::pin!(grace);
758                    tokio::select! {
759                        result = &mut op_future => ScopeInterruptResult::Op(result),
760                        _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
761                    }
762                }
763            }
764        };
765
766        match result {
767            ScopeInterruptResult::Op(result) => result,
768            ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
769                self.deadlines.pop();
770                self.cancel_spawned_tasks();
771                Err(Self::deadline_exceeded_error())
772            }
773            ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
774                Err(Self::interrupt_handler_timeout_error())
775            }
776            ScopeInterruptResult::CancelTimedOut => {
777                self.cancel_spawned_tasks();
778                let signal = self
779                    .take_host_interrupt_signal()
780                    .unwrap_or_else(|| "SIGINT".to_string());
781                if self.has_interrupt_handler_for(&signal) {
782                    self.dispatch_interrupt_handlers(&signal).await?;
783                }
784                Err(Self::cancelled_error())
785            }
786        }
787    }
788
789    pub(crate) fn deadline_exceeded_error() -> VmError {
790        VmError::Thrown(VmValue::String(arcstr::ArcStr::from("Deadline exceeded")))
791    }
792
793    pub(crate) fn cancelled_error() -> VmError {
794        VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
795            "kind:cancelled:VM cancelled by host",
796        )))
797    }
798
799    /// Capture the current call stack as (fn_name, line, col, source_file) tuples.
800    pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
801        self.frames
802            .iter()
803            .map(|f| {
804                let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
805                let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
806                let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
807                (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
808            })
809            .collect()
810    }
811
812    /// Enrich a VmError with source line information from the captured stack
813    /// trace. Appends ` (line N)` to error variants whose messages don't
814    /// already carry location context.
815    pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
816        // Determine the line AND source file from the captured stack trace
817        // (innermost frame) so the error names the exact `.harn` it crashed in.
818        // A bare `(line N)` is ambiguous across 100+ stdlib files and forces a
819        // manual hunt; `(stall.harn:497)` pinpoints it immediately.
820        let (line, file) = self
821            .error_stack_trace
822            .last()
823            .map(|(_, l, _, f)| (*l, f.clone()))
824            .unwrap_or_else(|| (self.current_line(), None));
825        if line == 0 {
826            return error;
827        }
828        let suffix = match file.as_deref() {
829            Some(path) => {
830                let name = std::path::Path::new(path)
831                    .file_name()
832                    .and_then(|n| n.to_str())
833                    .unwrap_or(path);
834                format!(" ({name}:{line})")
835            }
836            None => format!(" (line {line})"),
837        };
838        match error {
839            VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
840            VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
841            VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
842            VmError::UndefinedVariable(name) => {
843                VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
844            }
845            VmError::UndefinedBuiltin(name) => {
846                VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
847            }
848            VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
849                "Cannot assign to immutable binding: {name}{suffix}"
850            )),
851            VmError::StackOverflow => {
852                VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
853            }
854            // Leave these untouched:
855            // - Thrown: user-thrown errors should not be silently modified
856            // - CategorizedError: structured errors for agent orchestration
857            // - Return: control flow, not a real error
858            // - StackUnderflow / InvalidInstruction: internal VM bugs
859            other => other,
860        }
861    }
862}