1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13 Scope,
14 InterruptHandler,
15}
16
17impl Vm {
18 pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
20 let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
21 let result = self.run_chunk(chunk).await;
22 crate::tracing::span_end(span_id);
23 result
24 }
25
26 pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
28 let thrown_value = match &error {
29 VmError::Thrown(v) => v.clone(),
30 other => VmValue::String(Rc::from(other.to_string())),
31 };
32
33 if let Some(handler) = self.exception_handlers.pop() {
34 if !handler.error_type.is_empty() {
35 let matches = match &thrown_value {
37 VmValue::EnumVariant { enum_name, .. } => {
38 enum_name.as_ref() == handler.error_type
39 }
40 _ => false,
41 };
42 if !matches {
43 return self.handle_error(error);
44 }
45 }
46
47 self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
48
49 while self.frames.len() > handler.frame_depth {
50 if let Some(frame) = self.frames.pop() {
51 if let Some(ref dir) = frame.saved_source_dir {
52 crate::stdlib::set_thread_source_dir(dir);
53 }
54 self.iterators.truncate(frame.saved_iterator_depth);
55 self.env = frame.saved_env;
56 }
57 }
58 crate::step_runtime::prune_below_frame(self.frames.len());
59
60 while self
62 .deadlines
63 .last()
64 .is_some_and(|d| d.1 > handler.frame_depth)
65 {
66 self.deadlines.pop();
67 }
68
69 self.env.truncate_scopes(handler.env_scope_depth);
70
71 self.stack.truncate(handler.stack_depth);
72 self.stack.push(thrown_value);
73
74 if let Some(frame) = self.frames.last_mut() {
75 frame.ip = handler.catch_ip;
76 }
77
78 Ok(None)
79 } else {
80 Err(error)
81 }
82 }
83
84 pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
85 self.run_chunk_entry(chunk, 0, None, None, None, None).await
86 }
87
88 pub(crate) async fn run_chunk_entry(
89 &mut self,
90 chunk: &Chunk,
91 argc: usize,
92 saved_source_dir: Option<std::path::PathBuf>,
93 module_functions: Option<ModuleFunctionRegistry>,
94 module_state: Option<crate::value::ModuleState>,
95 local_slots: Option<Vec<LocalSlot>>,
96 ) -> Result<VmValue, VmError> {
97 self.run_chunk_ref(
98 Rc::new(chunk.clone()),
99 argc,
100 saved_source_dir,
101 module_functions,
102 module_state,
103 local_slots,
104 )
105 .await
106 }
107
108 pub(crate) async fn run_chunk_ref(
109 &mut self,
110 chunk: ChunkRef,
111 argc: usize,
112 saved_source_dir: Option<std::path::PathBuf>,
113 module_functions: Option<ModuleFunctionRegistry>,
114 module_state: Option<crate::value::ModuleState>,
115 local_slots: Option<Vec<LocalSlot>>,
116 ) -> Result<VmValue, VmError> {
117 let debugger = self.debugger_attached();
118 let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
119 let initial_env = if debugger {
120 Some(self.env.clone())
121 } else {
122 None
123 };
124 let initial_local_slots = if debugger {
125 Some(local_slots.clone())
126 } else {
127 None
128 };
129 self.frames.push(CallFrame {
130 chunk,
131 ip: 0,
132 stack_base: self.stack.len(),
133 saved_env: self.env.clone(),
134 initial_env,
135 initial_local_slots,
136 saved_iterator_depth: self.iterators.len(),
137 fn_name: String::new(),
138 argc,
139 saved_source_dir,
140 module_functions,
141 module_state,
142 local_slots,
143 local_scope_base: self.env.scope_depth().saturating_sub(1),
144 local_scope_depth: 0,
145 });
146
147 loop {
148 if let Some(err) = self.pending_scope_interrupt().await {
149 match self.handle_error(err) {
150 Ok(None) => continue,
151 Ok(Some(val)) => return Ok(val),
152 Err(e) => return Err(e),
153 }
154 }
155
156 let frame = match self.frames.last_mut() {
157 Some(f) => f,
158 None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
159 };
160
161 if frame.ip >= frame.chunk.code.len() {
162 let val = self.stack.pop().unwrap_or(VmValue::Nil);
163 let val = self.run_step_post_hooks_for_current_frame(val).await?;
164 self.release_sync_guards_for_frame(self.frames.len());
165 let popped_frame = self.frames.pop().unwrap();
166 if let Some(ref dir) = popped_frame.saved_source_dir {
167 crate::stdlib::set_thread_source_dir(dir);
168 }
169 crate::step_runtime::prune_below_frame(self.frames.len());
170
171 if self.frames.is_empty() {
172 return Ok(val);
173 } else {
174 self.iterators.truncate(popped_frame.saved_iterator_depth);
175 self.env = popped_frame.saved_env;
176 self.stack.truncate(popped_frame.stack_base);
177 self.stack.push(val);
178 continue;
179 }
180 }
181
182 let op = frame.chunk.code[frame.ip];
183 frame.ip += 1;
184
185 match self.execute_op_with_scope_interrupts(op).await {
186 Ok(Some(val)) => return Ok(val),
187 Ok(None) => continue,
188 Err(VmError::Return(val)) => {
189 let val = self.run_step_post_hooks_for_current_frame(val).await?;
190 if let Some(popped_frame) = self.frames.pop() {
191 self.release_sync_guards_for_frame(self.frames.len() + 1);
192 if let Some(ref dir) = popped_frame.saved_source_dir {
193 crate::stdlib::set_thread_source_dir(dir);
194 }
195 let current_depth = self.frames.len();
196 self.exception_handlers
197 .retain(|h| h.frame_depth <= current_depth);
198 crate::step_runtime::prune_below_frame(current_depth);
199
200 if self.frames.is_empty() {
201 return Ok(val);
202 }
203 self.iterators.truncate(popped_frame.saved_iterator_depth);
204 self.env = popped_frame.saved_env;
205 self.stack.truncate(popped_frame.stack_base);
206 self.stack.push(val);
207 } else {
208 return Ok(val);
209 }
210 }
211 Err(e) => {
212 if self.error_stack_trace.is_empty() {
214 self.error_stack_trace = self.capture_stack_trace();
215 }
216 let e = match self.apply_step_error_boundary(e) {
223 StepBoundaryOutcome::Returned(val) => {
224 self.error_stack_trace.clear();
225 self.stack.push(val);
226 continue;
227 }
228 StepBoundaryOutcome::Throw(err) => err,
229 };
230 match self.handle_error(e) {
231 Ok(None) => {
232 self.error_stack_trace.clear();
233 continue;
234 }
235 Ok(Some(val)) => return Ok(val),
236 Err(e) => return Err(self.enrich_error_with_line(e)),
237 }
238 }
239 }
240 }
241 }
242
243 pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
249 use crate::step_runtime;
250 if !step_runtime::is_step_budget_exhausted(&error) {
251 return StepBoundaryOutcome::Throw(error);
252 }
253 let Some(step_depth) = step_runtime::active_step_frame_depth() else {
254 return StepBoundaryOutcome::Throw(error);
255 };
256 if step_depth != self.frames.len() {
261 return StepBoundaryOutcome::Throw(error);
262 }
263 let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
264 .unwrap_or(step_runtime::StepErrorBoundary::Fail);
265 match boundary {
266 step_runtime::StepErrorBoundary::Continue => {
267 if let Some(popped) = self.frames.pop() {
271 self.release_sync_guards_for_frame(self.frames.len() + 1);
272 if let Some(ref dir) = popped.saved_source_dir {
273 crate::stdlib::set_thread_source_dir(dir);
274 }
275 let current_depth = self.frames.len();
276 self.exception_handlers
277 .retain(|h| h.frame_depth <= current_depth);
278 step_runtime::pop_and_record(
279 current_depth + 1,
280 "skipped",
281 Some(step_runtime_error_message(&error)),
282 );
283 if self.frames.is_empty() {
284 return StepBoundaryOutcome::Returned(VmValue::Nil);
285 }
286 self.iterators.truncate(popped.saved_iterator_depth);
287 self.env = popped.saved_env;
288 self.stack.truncate(popped.stack_base);
289 }
290 StepBoundaryOutcome::Returned(VmValue::Nil)
291 }
292 step_runtime::StepErrorBoundary::Escalate => {
293 let identity = step_runtime::with_active_step(|step| {
294 (
295 step.definition.name.clone(),
296 step.definition.function.clone(),
297 )
298 });
299 step_runtime::pop_and_record(
300 step_depth,
301 "escalated",
302 Some(step_runtime_error_message(&error)),
303 );
304 let (step_name, function) = identity.unzip();
305 StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
306 error,
307 step_name.as_deref(),
308 function.as_deref(),
309 ))
310 }
311 step_runtime::StepErrorBoundary::Fail => {
312 step_runtime::pop_and_record(
313 step_depth,
314 "failed",
315 Some(step_runtime_error_message(&error)),
316 );
317 StepBoundaryOutcome::Throw(error)
318 }
319 }
320 }
321}
322
323fn next_deadline(
324 scope_deadline: Option<Instant>,
325 interrupt_handler_deadline: Option<Instant>,
326) -> (Option<Instant>, Option<DeadlineKind>) {
327 match (scope_deadline, interrupt_handler_deadline) {
328 (Some(scope), Some(interrupt)) if interrupt < scope => {
329 (Some(interrupt), Some(DeadlineKind::InterruptHandler))
330 }
331 (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
332 (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
333 (None, None) => (None, None),
334 }
335}
336
337fn step_runtime_error_message(error: &VmError) -> String {
338 match error {
339 VmError::Thrown(VmValue::Dict(dict)) => dict
340 .get("message")
341 .map(|v| v.display())
342 .unwrap_or_else(|| error.to_string()),
343 _ => error.to_string(),
344 }
345}
346
347pub(crate) enum StepBoundaryOutcome {
348 Returned(VmValue),
349 Throw(VmError),
350}
351
352impl crate::vm::Vm {
353 pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
354 if let Some(err) = self.pending_scope_interrupt().await {
355 match self.handle_error(err) {
356 Ok(None) => return Ok(None),
357 Ok(Some(val)) => return Ok(Some((val, false))),
358 Err(e) => return Err(e),
359 }
360 }
361
362 let frame = match self.frames.last_mut() {
363 Some(f) => f,
364 None => {
365 let val = self.stack.pop().unwrap_or(VmValue::Nil);
366 return Ok(Some((val, false)));
367 }
368 };
369
370 if frame.ip >= frame.chunk.code.len() {
371 let val = self.stack.pop().unwrap_or(VmValue::Nil);
372 self.release_sync_guards_for_frame(self.frames.len());
373 let popped_frame = self.frames.pop().unwrap();
374 if self.frames.is_empty() {
375 return Ok(Some((val, false)));
376 } else {
377 self.iterators.truncate(popped_frame.saved_iterator_depth);
378 self.env = popped_frame.saved_env;
379 self.stack.truncate(popped_frame.stack_base);
380 self.stack.push(val);
381 return Ok(None);
382 }
383 }
384
385 let op = frame.chunk.code[frame.ip];
386 frame.ip += 1;
387
388 match self.execute_op_with_scope_interrupts(op).await {
389 Ok(Some(val)) => Ok(Some((val, false))),
390 Ok(None) => Ok(None),
391 Err(VmError::Return(val)) => {
392 if let Some(popped_frame) = self.frames.pop() {
393 self.release_sync_guards_for_frame(self.frames.len() + 1);
394 if let Some(ref dir) = popped_frame.saved_source_dir {
395 crate::stdlib::set_thread_source_dir(dir);
396 }
397 let current_depth = self.frames.len();
398 self.exception_handlers
399 .retain(|h| h.frame_depth <= current_depth);
400 if self.frames.is_empty() {
401 return Ok(Some((val, false)));
402 }
403 self.iterators.truncate(popped_frame.saved_iterator_depth);
404 self.env = popped_frame.saved_env;
405 self.stack.truncate(popped_frame.stack_base);
406 self.stack.push(val);
407 Ok(None)
408 } else {
409 Ok(Some((val, false)))
410 }
411 }
412 Err(e) => {
413 if self.error_stack_trace.is_empty() {
414 self.error_stack_trace = self.capture_stack_trace();
415 }
416 match self.handle_error(e) {
417 Ok(None) => {
418 self.error_stack_trace.clear();
419 Ok(None)
420 }
421 Ok(Some(val)) => Ok(Some((val, false))),
422 Err(e) => Err(self.enrich_error_with_line(e)),
423 }
424 }
425 }
426 }
427
428 async fn execute_op_with_scope_interrupts(
429 &mut self,
430 op: u8,
431 ) -> Result<Option<VmValue>, VmError> {
432 enum ScopeInterruptResult {
433 Op(Result<Option<VmValue>, VmError>),
434 Deadline(DeadlineKind),
435 CancelTimedOut,
436 }
437
438 let (deadline, deadline_kind) = next_deadline(
439 self.deadlines.last().map(|(deadline, _)| *deadline),
440 self.interrupt_handler_deadline,
441 );
442 let cancel_token = self.cancel_token.clone();
443
444 if deadline.is_none() && cancel_token.is_none() {
445 return self.execute_op(op).await;
446 }
447
448 let has_deadline = deadline.is_some();
449 let cancel_requested_at_start = cancel_token
450 .as_ref()
451 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
452 let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
453 let deadline_sleep = async move {
454 if let Some(deadline) = deadline {
455 tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
456 } else {
457 std::future::pending::<()>().await;
458 }
459 };
460 let cancel_sleep = async move {
461 if let Some(token) = cancel_token {
462 while !token.load(std::sync::atomic::Ordering::SeqCst) {
463 tokio::time::sleep(Duration::from_millis(10)).await;
464 }
465 } else {
466 std::future::pending::<()>().await;
467 }
468 };
469
470 let result = {
471 let op_future = self.execute_op(op);
472 tokio::pin!(op_future);
473 tokio::select! {
474 result = &mut op_future => ScopeInterruptResult::Op(result),
475 _ = deadline_sleep, if has_deadline => {
476 ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
477 },
478 _ = cancel_sleep, if has_cancel => {
479 let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
480 tokio::pin!(grace);
481 tokio::select! {
482 result = &mut op_future => ScopeInterruptResult::Op(result),
483 _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
484 }
485 }
486 }
487 };
488
489 match result {
490 ScopeInterruptResult::Op(result) => result,
491 ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
492 self.deadlines.pop();
493 self.cancel_spawned_tasks();
494 Err(Self::deadline_exceeded_error())
495 }
496 ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
497 Err(Self::interrupt_handler_timeout_error())
498 }
499 ScopeInterruptResult::CancelTimedOut => {
500 self.cancel_spawned_tasks();
501 let signal = self
502 .take_host_interrupt_signal()
503 .unwrap_or_else(|| "SIGINT".to_string());
504 if self.has_interrupt_handler_for(&signal) {
505 self.dispatch_interrupt_handlers(&signal).await?;
506 }
507 Err(Self::cancelled_error())
508 }
509 }
510 }
511
512 pub(crate) fn deadline_exceeded_error() -> VmError {
513 VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
514 }
515
516 pub(crate) fn cancelled_error() -> VmError {
517 VmError::Thrown(VmValue::String(Rc::from(
518 "kind:cancelled:VM cancelled by host",
519 )))
520 }
521
522 pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
524 self.frames
525 .iter()
526 .map(|f| {
527 let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
528 let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
529 let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
530 (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
531 })
532 .collect()
533 }
534
535 pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
539 let line = self
541 .error_stack_trace
542 .last()
543 .map(|(_, l, _, _)| *l)
544 .unwrap_or_else(|| self.current_line());
545 if line == 0 {
546 return error;
547 }
548 let suffix = format!(" (line {line})");
549 match error {
550 VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
551 VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
552 VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
553 VmError::UndefinedVariable(name) => {
554 VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
555 }
556 VmError::UndefinedBuiltin(name) => {
557 VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
558 }
559 VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
560 "Cannot assign to immutable binding: {name}{suffix}"
561 )),
562 VmError::StackOverflow => {
563 VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
564 }
565 other => other,
571 }
572 }
573}