Skip to main content

harn_vm/vm/
execution.rs

1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13    Scope,
14    InterruptHandler,
15}
16
17impl Vm {
18    /// Execute a compiled chunk.
19    pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
20        let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
21        let result = self.run_chunk(chunk).await;
22        crate::tracing::span_end(span_id);
23        result
24    }
25
26    /// Convert a VmError into either a handled exception (returning Ok) or a propagated error.
27    pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
28        let thrown_value = match &error {
29            VmError::Thrown(v) => v.clone(),
30            other => VmValue::String(Rc::from(other.to_string())),
31        };
32
33        if let Some(handler) = self.exception_handlers.pop() {
34            if !handler.error_type.is_empty() {
35                // Typed catch: only match when the thrown enum's type equals the declared type.
36                let matches = match &thrown_value {
37                    VmValue::EnumVariant { enum_name, .. } => {
38                        enum_name.as_ref() == handler.error_type
39                    }
40                    _ => false,
41                };
42                if !matches {
43                    return self.handle_error(error);
44                }
45            }
46
47            self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
48
49            while self.frames.len() > handler.frame_depth {
50                if let Some(frame) = self.frames.pop() {
51                    if let Some(ref dir) = frame.saved_source_dir {
52                        crate::stdlib::set_thread_source_dir(dir);
53                    }
54                    self.iterators.truncate(frame.saved_iterator_depth);
55                    self.env = frame.saved_env;
56                }
57            }
58            crate::step_runtime::prune_below_frame(self.frames.len());
59
60            // Drop deadlines that belonged to unwound frames.
61            while self
62                .deadlines
63                .last()
64                .is_some_and(|d| d.1 > handler.frame_depth)
65            {
66                self.deadlines.pop();
67            }
68
69            self.env.truncate_scopes(handler.env_scope_depth);
70
71            self.stack.truncate(handler.stack_depth);
72            self.stack.push(thrown_value);
73
74            if let Some(frame) = self.frames.last_mut() {
75                frame.ip = handler.catch_ip;
76            }
77
78            Ok(None)
79        } else {
80            Err(error)
81        }
82    }
83
84    pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
85        self.run_chunk_entry(chunk, 0, None, None, None, None).await
86    }
87
88    pub(crate) async fn run_chunk_entry(
89        &mut self,
90        chunk: &Chunk,
91        argc: usize,
92        saved_source_dir: Option<std::path::PathBuf>,
93        module_functions: Option<ModuleFunctionRegistry>,
94        module_state: Option<crate::value::ModuleState>,
95        local_slots: Option<Vec<LocalSlot>>,
96    ) -> Result<VmValue, VmError> {
97        self.run_chunk_ref(
98            Rc::new(chunk.clone()),
99            argc,
100            saved_source_dir,
101            module_functions,
102            module_state,
103            local_slots,
104        )
105        .await
106    }
107
108    pub(crate) async fn run_chunk_ref(
109        &mut self,
110        chunk: ChunkRef,
111        argc: usize,
112        saved_source_dir: Option<std::path::PathBuf>,
113        module_functions: Option<ModuleFunctionRegistry>,
114        module_state: Option<crate::value::ModuleState>,
115        local_slots: Option<Vec<LocalSlot>>,
116    ) -> Result<VmValue, VmError> {
117        let initial_env = self.env.clone();
118        let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
119        let initial_local_slots = local_slots.clone();
120        self.frames.push(CallFrame {
121            chunk,
122            ip: 0,
123            stack_base: self.stack.len(),
124            saved_env: self.env.clone(),
125            initial_env: Some(initial_env),
126            initial_local_slots: Some(initial_local_slots),
127            saved_iterator_depth: self.iterators.len(),
128            fn_name: String::new(),
129            argc,
130            saved_source_dir,
131            module_functions,
132            module_state,
133            local_slots,
134            local_scope_base: self.env.scope_depth().saturating_sub(1),
135            local_scope_depth: 0,
136        });
137
138        loop {
139            if let Some(err) = self.pending_scope_interrupt().await {
140                match self.handle_error(err) {
141                    Ok(None) => continue,
142                    Ok(Some(val)) => return Ok(val),
143                    Err(e) => return Err(e),
144                }
145            }
146
147            let frame = match self.frames.last_mut() {
148                Some(f) => f,
149                None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
150            };
151
152            if frame.ip >= frame.chunk.code.len() {
153                let val = self.stack.pop().unwrap_or(VmValue::Nil);
154                let val = self.run_step_post_hooks_for_current_frame(val).await?;
155                self.release_sync_guards_for_frame(self.frames.len());
156                let popped_frame = self.frames.pop().unwrap();
157                if let Some(ref dir) = popped_frame.saved_source_dir {
158                    crate::stdlib::set_thread_source_dir(dir);
159                }
160                crate::step_runtime::prune_below_frame(self.frames.len());
161
162                if self.frames.is_empty() {
163                    return Ok(val);
164                } else {
165                    self.iterators.truncate(popped_frame.saved_iterator_depth);
166                    self.env = popped_frame.saved_env;
167                    self.stack.truncate(popped_frame.stack_base);
168                    self.stack.push(val);
169                    continue;
170                }
171            }
172
173            let op = frame.chunk.code[frame.ip];
174            frame.ip += 1;
175
176            match self.execute_op_with_scope_interrupts(op).await {
177                Ok(Some(val)) => return Ok(val),
178                Ok(None) => continue,
179                Err(VmError::Return(val)) => {
180                    let val = self.run_step_post_hooks_for_current_frame(val).await?;
181                    if let Some(popped_frame) = self.frames.pop() {
182                        self.release_sync_guards_for_frame(self.frames.len() + 1);
183                        if let Some(ref dir) = popped_frame.saved_source_dir {
184                            crate::stdlib::set_thread_source_dir(dir);
185                        }
186                        let current_depth = self.frames.len();
187                        self.exception_handlers
188                            .retain(|h| h.frame_depth <= current_depth);
189                        crate::step_runtime::prune_below_frame(current_depth);
190
191                        if self.frames.is_empty() {
192                            return Ok(val);
193                        }
194                        self.iterators.truncate(popped_frame.saved_iterator_depth);
195                        self.env = popped_frame.saved_env;
196                        self.stack.truncate(popped_frame.stack_base);
197                        self.stack.push(val);
198                    } else {
199                        return Ok(val);
200                    }
201                }
202                Err(e) => {
203                    // Capture stack trace before error handling unwinds frames.
204                    if self.error_stack_trace.is_empty() {
205                        self.error_stack_trace = self.capture_stack_trace();
206                    }
207                    // Honor `@step(error_boundary: ...)` if a step-budget
208                    // exhaustion error is propagating out of the step's
209                    // own frame. `continue` swaps the throw for a Nil
210                    // return; `escalate` re-tags the error as a handoff
211                    // escalation and lets the existing exception
212                    // handlers route it.
213                    let e = match self.apply_step_error_boundary(e) {
214                        StepBoundaryOutcome::Returned(val) => {
215                            self.error_stack_trace.clear();
216                            self.stack.push(val);
217                            continue;
218                        }
219                        StepBoundaryOutcome::Throw(err) => err,
220                    };
221                    match self.handle_error(e) {
222                        Ok(None) => {
223                            self.error_stack_trace.clear();
224                            continue;
225                        }
226                        Ok(Some(val)) => return Ok(val),
227                        Err(e) => return Err(self.enrich_error_with_line(e)),
228                    }
229                }
230            }
231        }
232    }
233
234    /// Inspect a thrown error against the topmost active step's
235    /// `error_boundary`. Called from the main step loop before
236    /// `handle_error` so that a step's own budget-exhaustion error can be
237    /// short-circuited (`continue`) or annotated (`escalate`) before the
238    /// generic try/catch machinery sees it.
239    pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
240        use crate::step_runtime;
241        if !step_runtime::is_step_budget_exhausted(&error) {
242            return StepBoundaryOutcome::Throw(error);
243        }
244        let Some(step_depth) = step_runtime::active_step_frame_depth() else {
245            return StepBoundaryOutcome::Throw(error);
246        };
247        // The step's frame is the topmost on the call stack iff its
248        // recorded frame_depth equals `frames.len()`. If the throw is
249        // coming from a deeper frame we let it bubble up — the boundary
250        // still applies later when the step's own frame is reached.
251        if step_depth != self.frames.len() {
252            return StepBoundaryOutcome::Throw(error);
253        }
254        let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
255            .unwrap_or(step_runtime::StepErrorBoundary::Fail);
256        match boundary {
257            step_runtime::StepErrorBoundary::Continue => {
258                // Mimic VmError::Return(Nil) for the step's frame: pop
259                // the frame, restore its env/iterators/stack, and feed a
260                // Nil return value back to the caller.
261                if let Some(popped) = self.frames.pop() {
262                    self.release_sync_guards_for_frame(self.frames.len() + 1);
263                    if let Some(ref dir) = popped.saved_source_dir {
264                        crate::stdlib::set_thread_source_dir(dir);
265                    }
266                    let current_depth = self.frames.len();
267                    self.exception_handlers
268                        .retain(|h| h.frame_depth <= current_depth);
269                    step_runtime::pop_and_record(
270                        current_depth + 1,
271                        "skipped",
272                        Some(step_runtime_error_message(&error)),
273                    );
274                    if self.frames.is_empty() {
275                        return StepBoundaryOutcome::Returned(VmValue::Nil);
276                    }
277                    self.iterators.truncate(popped.saved_iterator_depth);
278                    self.env = popped.saved_env;
279                    self.stack.truncate(popped.stack_base);
280                }
281                StepBoundaryOutcome::Returned(VmValue::Nil)
282            }
283            step_runtime::StepErrorBoundary::Escalate => {
284                let identity = step_runtime::with_active_step(|step| {
285                    (
286                        step.definition.name.clone(),
287                        step.definition.function.clone(),
288                    )
289                });
290                step_runtime::pop_and_record(
291                    step_depth,
292                    "escalated",
293                    Some(step_runtime_error_message(&error)),
294                );
295                let (step_name, function) = identity.unzip();
296                StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
297                    error,
298                    step_name.as_deref(),
299                    function.as_deref(),
300                ))
301            }
302            step_runtime::StepErrorBoundary::Fail => {
303                step_runtime::pop_and_record(
304                    step_depth,
305                    "failed",
306                    Some(step_runtime_error_message(&error)),
307                );
308                StepBoundaryOutcome::Throw(error)
309            }
310        }
311    }
312}
313
314fn next_deadline(
315    scope_deadline: Option<Instant>,
316    interrupt_handler_deadline: Option<Instant>,
317) -> (Option<Instant>, Option<DeadlineKind>) {
318    match (scope_deadline, interrupt_handler_deadline) {
319        (Some(scope), Some(interrupt)) if interrupt < scope => {
320            (Some(interrupt), Some(DeadlineKind::InterruptHandler))
321        }
322        (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
323        (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
324        (None, None) => (None, None),
325    }
326}
327
328fn step_runtime_error_message(error: &VmError) -> String {
329    match error {
330        VmError::Thrown(VmValue::Dict(dict)) => dict
331            .get("message")
332            .map(|v| v.display())
333            .unwrap_or_else(|| error.to_string()),
334        _ => error.to_string(),
335    }
336}
337
338pub(crate) enum StepBoundaryOutcome {
339    Returned(VmValue),
340    Throw(VmError),
341}
342
343impl crate::vm::Vm {
344    pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
345        if let Some(err) = self.pending_scope_interrupt().await {
346            match self.handle_error(err) {
347                Ok(None) => return Ok(None),
348                Ok(Some(val)) => return Ok(Some((val, false))),
349                Err(e) => return Err(e),
350            }
351        }
352
353        let frame = match self.frames.last_mut() {
354            Some(f) => f,
355            None => {
356                let val = self.stack.pop().unwrap_or(VmValue::Nil);
357                return Ok(Some((val, false)));
358            }
359        };
360
361        if frame.ip >= frame.chunk.code.len() {
362            let val = self.stack.pop().unwrap_or(VmValue::Nil);
363            self.release_sync_guards_for_frame(self.frames.len());
364            let popped_frame = self.frames.pop().unwrap();
365            if self.frames.is_empty() {
366                return Ok(Some((val, false)));
367            } else {
368                self.iterators.truncate(popped_frame.saved_iterator_depth);
369                self.env = popped_frame.saved_env;
370                self.stack.truncate(popped_frame.stack_base);
371                self.stack.push(val);
372                return Ok(None);
373            }
374        }
375
376        let op = frame.chunk.code[frame.ip];
377        frame.ip += 1;
378
379        match self.execute_op_with_scope_interrupts(op).await {
380            Ok(Some(val)) => Ok(Some((val, false))),
381            Ok(None) => Ok(None),
382            Err(VmError::Return(val)) => {
383                if let Some(popped_frame) = self.frames.pop() {
384                    self.release_sync_guards_for_frame(self.frames.len() + 1);
385                    if let Some(ref dir) = popped_frame.saved_source_dir {
386                        crate::stdlib::set_thread_source_dir(dir);
387                    }
388                    let current_depth = self.frames.len();
389                    self.exception_handlers
390                        .retain(|h| h.frame_depth <= current_depth);
391                    if self.frames.is_empty() {
392                        return Ok(Some((val, false)));
393                    }
394                    self.iterators.truncate(popped_frame.saved_iterator_depth);
395                    self.env = popped_frame.saved_env;
396                    self.stack.truncate(popped_frame.stack_base);
397                    self.stack.push(val);
398                    Ok(None)
399                } else {
400                    Ok(Some((val, false)))
401                }
402            }
403            Err(e) => {
404                if self.error_stack_trace.is_empty() {
405                    self.error_stack_trace = self.capture_stack_trace();
406                }
407                match self.handle_error(e) {
408                    Ok(None) => {
409                        self.error_stack_trace.clear();
410                        Ok(None)
411                    }
412                    Ok(Some(val)) => Ok(Some((val, false))),
413                    Err(e) => Err(self.enrich_error_with_line(e)),
414                }
415            }
416        }
417    }
418
419    async fn execute_op_with_scope_interrupts(
420        &mut self,
421        op: u8,
422    ) -> Result<Option<VmValue>, VmError> {
423        enum ScopeInterruptResult {
424            Op(Result<Option<VmValue>, VmError>),
425            Deadline(DeadlineKind),
426            CancelTimedOut,
427        }
428
429        let (deadline, deadline_kind) = next_deadline(
430            self.deadlines.last().map(|(deadline, _)| *deadline),
431            self.interrupt_handler_deadline,
432        );
433        let cancel_token = self.cancel_token.clone();
434
435        if deadline.is_none() && cancel_token.is_none() {
436            return self.execute_op(op).await;
437        }
438
439        let has_deadline = deadline.is_some();
440        let cancel_requested_at_start = cancel_token
441            .as_ref()
442            .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
443        let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
444        let deadline_sleep = async move {
445            if let Some(deadline) = deadline {
446                tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
447            } else {
448                std::future::pending::<()>().await;
449            }
450        };
451        let cancel_sleep = async move {
452            if let Some(token) = cancel_token {
453                while !token.load(std::sync::atomic::Ordering::SeqCst) {
454                    tokio::time::sleep(Duration::from_millis(10)).await;
455                }
456            } else {
457                std::future::pending::<()>().await;
458            }
459        };
460
461        let result = {
462            let op_future = self.execute_op(op);
463            tokio::pin!(op_future);
464            tokio::select! {
465                result = &mut op_future => ScopeInterruptResult::Op(result),
466                _ = deadline_sleep, if has_deadline => {
467                    ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
468                },
469                _ = cancel_sleep, if has_cancel => {
470                    let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
471                    tokio::pin!(grace);
472                    tokio::select! {
473                        result = &mut op_future => ScopeInterruptResult::Op(result),
474                        _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
475                    }
476                }
477            }
478        };
479
480        match result {
481            ScopeInterruptResult::Op(result) => result,
482            ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
483                self.deadlines.pop();
484                self.cancel_spawned_tasks();
485                Err(Self::deadline_exceeded_error())
486            }
487            ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
488                Err(Self::interrupt_handler_timeout_error())
489            }
490            ScopeInterruptResult::CancelTimedOut => {
491                self.cancel_spawned_tasks();
492                let signal = self
493                    .take_host_interrupt_signal()
494                    .unwrap_or_else(|| "SIGINT".to_string());
495                if self.has_interrupt_handler_for(&signal) {
496                    self.dispatch_interrupt_handlers(&signal).await?;
497                }
498                Err(Self::cancelled_error())
499            }
500        }
501    }
502
503    pub(crate) fn deadline_exceeded_error() -> VmError {
504        VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
505    }
506
507    pub(crate) fn cancelled_error() -> VmError {
508        VmError::Thrown(VmValue::String(Rc::from(
509            "kind:cancelled:VM cancelled by host",
510        )))
511    }
512
513    /// Capture the current call stack as (fn_name, line, col, source_file) tuples.
514    pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
515        self.frames
516            .iter()
517            .map(|f| {
518                let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
519                let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
520                let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
521                (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
522            })
523            .collect()
524    }
525
526    /// Enrich a VmError with source line information from the captured stack
527    /// trace. Appends ` (line N)` to error variants whose messages don't
528    /// already carry location context.
529    pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
530        // Determine the line from the captured stack trace (innermost frame).
531        let line = self
532            .error_stack_trace
533            .last()
534            .map(|(_, l, _, _)| *l)
535            .unwrap_or_else(|| self.current_line());
536        if line == 0 {
537            return error;
538        }
539        let suffix = format!(" (line {line})");
540        match error {
541            VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
542            VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
543            VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
544            VmError::UndefinedVariable(name) => {
545                VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
546            }
547            VmError::UndefinedBuiltin(name) => {
548                VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
549            }
550            VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
551                "Cannot assign to immutable binding: {name}{suffix}"
552            )),
553            VmError::StackOverflow => {
554                VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
555            }
556            // Leave these untouched:
557            // - Thrown: user-thrown errors should not be silently modified
558            // - CategorizedError: structured errors for agent orchestration
559            // - Return: control flow, not a real error
560            // - StackUnderflow / InvalidInstruction: internal VM bugs
561            other => other,
562        }
563    }
564}