1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13 Scope,
14 InterruptHandler,
15}
16
17impl Vm {
18 pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
20 let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
21 let result = self.run_chunk(chunk).await;
22 crate::tracing::span_end(span_id);
23 result
24 }
25
26 pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
28 let thrown_value = match &error {
29 VmError::Thrown(v) => v.clone(),
30 other => VmValue::String(Rc::from(other.to_string())),
31 };
32
33 if let Some(handler) = self.exception_handlers.pop() {
34 if !handler.error_type.is_empty() {
35 let matches = match &thrown_value {
37 VmValue::EnumVariant { enum_name, .. } => {
38 enum_name.as_ref() == handler.error_type
39 }
40 _ => false,
41 };
42 if !matches {
43 return self.handle_error(error);
44 }
45 }
46
47 self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
48
49 while self.frames.len() > handler.frame_depth {
50 if let Some(frame) = self.frames.pop() {
51 if let Some(ref dir) = frame.saved_source_dir {
52 crate::stdlib::set_thread_source_dir(dir);
53 }
54 self.iterators.truncate(frame.saved_iterator_depth);
55 self.env = frame.saved_env;
56 }
57 }
58 crate::step_runtime::prune_below_frame(self.frames.len());
59
60 while self
62 .deadlines
63 .last()
64 .is_some_and(|d| d.1 > handler.frame_depth)
65 {
66 self.deadlines.pop();
67 }
68
69 self.env.truncate_scopes(handler.env_scope_depth);
70
71 self.stack.truncate(handler.stack_depth);
72 self.stack.push(thrown_value);
73
74 if let Some(frame) = self.frames.last_mut() {
75 frame.ip = handler.catch_ip;
76 }
77
78 Ok(None)
79 } else {
80 Err(error)
81 }
82 }
83
84 pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
85 self.run_chunk_entry(chunk, 0, None, None, None, None).await
86 }
87
88 pub(crate) async fn run_chunk_entry(
89 &mut self,
90 chunk: &Chunk,
91 argc: usize,
92 saved_source_dir: Option<std::path::PathBuf>,
93 module_functions: Option<ModuleFunctionRegistry>,
94 module_state: Option<crate::value::ModuleState>,
95 local_slots: Option<Vec<LocalSlot>>,
96 ) -> Result<VmValue, VmError> {
97 self.run_chunk_ref(
98 Rc::new(chunk.clone()),
99 argc,
100 saved_source_dir,
101 module_functions,
102 module_state,
103 local_slots,
104 )
105 .await
106 }
107
108 pub(crate) async fn run_chunk_ref(
109 &mut self,
110 chunk: ChunkRef,
111 argc: usize,
112 saved_source_dir: Option<std::path::PathBuf>,
113 module_functions: Option<ModuleFunctionRegistry>,
114 module_state: Option<crate::value::ModuleState>,
115 local_slots: Option<Vec<LocalSlot>>,
116 ) -> Result<VmValue, VmError> {
117 let initial_env = self.env.clone();
118 let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
119 let initial_local_slots = local_slots.clone();
120 self.frames.push(CallFrame {
121 chunk,
122 ip: 0,
123 stack_base: self.stack.len(),
124 saved_env: self.env.clone(),
125 initial_env: Some(initial_env),
126 initial_local_slots: Some(initial_local_slots),
127 saved_iterator_depth: self.iterators.len(),
128 fn_name: String::new(),
129 argc,
130 saved_source_dir,
131 module_functions,
132 module_state,
133 local_slots,
134 local_scope_base: self.env.scope_depth().saturating_sub(1),
135 local_scope_depth: 0,
136 });
137
138 loop {
139 if let Some(err) = self.pending_scope_interrupt().await {
140 match self.handle_error(err) {
141 Ok(None) => continue,
142 Ok(Some(val)) => return Ok(val),
143 Err(e) => return Err(e),
144 }
145 }
146
147 let frame = match self.frames.last_mut() {
148 Some(f) => f,
149 None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
150 };
151
152 if frame.ip >= frame.chunk.code.len() {
153 let val = self.stack.pop().unwrap_or(VmValue::Nil);
154 let val = self.run_step_post_hooks_for_current_frame(val).await?;
155 self.release_sync_guards_for_frame(self.frames.len());
156 let popped_frame = self.frames.pop().unwrap();
157 if let Some(ref dir) = popped_frame.saved_source_dir {
158 crate::stdlib::set_thread_source_dir(dir);
159 }
160 crate::step_runtime::prune_below_frame(self.frames.len());
161
162 if self.frames.is_empty() {
163 return Ok(val);
164 } else {
165 self.iterators.truncate(popped_frame.saved_iterator_depth);
166 self.env = popped_frame.saved_env;
167 self.stack.truncate(popped_frame.stack_base);
168 self.stack.push(val);
169 continue;
170 }
171 }
172
173 let op = frame.chunk.code[frame.ip];
174 frame.ip += 1;
175
176 match self.execute_op_with_scope_interrupts(op).await {
177 Ok(Some(val)) => return Ok(val),
178 Ok(None) => continue,
179 Err(VmError::Return(val)) => {
180 let val = self.run_step_post_hooks_for_current_frame(val).await?;
181 if let Some(popped_frame) = self.frames.pop() {
182 self.release_sync_guards_for_frame(self.frames.len() + 1);
183 if let Some(ref dir) = popped_frame.saved_source_dir {
184 crate::stdlib::set_thread_source_dir(dir);
185 }
186 let current_depth = self.frames.len();
187 self.exception_handlers
188 .retain(|h| h.frame_depth <= current_depth);
189 crate::step_runtime::prune_below_frame(current_depth);
190
191 if self.frames.is_empty() {
192 return Ok(val);
193 }
194 self.iterators.truncate(popped_frame.saved_iterator_depth);
195 self.env = popped_frame.saved_env;
196 self.stack.truncate(popped_frame.stack_base);
197 self.stack.push(val);
198 } else {
199 return Ok(val);
200 }
201 }
202 Err(e) => {
203 if self.error_stack_trace.is_empty() {
205 self.error_stack_trace = self.capture_stack_trace();
206 }
207 let e = match self.apply_step_error_boundary(e) {
214 StepBoundaryOutcome::Returned(val) => {
215 self.error_stack_trace.clear();
216 self.stack.push(val);
217 continue;
218 }
219 StepBoundaryOutcome::Throw(err) => err,
220 };
221 match self.handle_error(e) {
222 Ok(None) => {
223 self.error_stack_trace.clear();
224 continue;
225 }
226 Ok(Some(val)) => return Ok(val),
227 Err(e) => return Err(self.enrich_error_with_line(e)),
228 }
229 }
230 }
231 }
232 }
233
234 pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
240 use crate::step_runtime;
241 if !step_runtime::is_step_budget_exhausted(&error) {
242 return StepBoundaryOutcome::Throw(error);
243 }
244 let Some(step_depth) = step_runtime::active_step_frame_depth() else {
245 return StepBoundaryOutcome::Throw(error);
246 };
247 if step_depth != self.frames.len() {
252 return StepBoundaryOutcome::Throw(error);
253 }
254 let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
255 .unwrap_or(step_runtime::StepErrorBoundary::Fail);
256 match boundary {
257 step_runtime::StepErrorBoundary::Continue => {
258 if let Some(popped) = self.frames.pop() {
262 self.release_sync_guards_for_frame(self.frames.len() + 1);
263 if let Some(ref dir) = popped.saved_source_dir {
264 crate::stdlib::set_thread_source_dir(dir);
265 }
266 let current_depth = self.frames.len();
267 self.exception_handlers
268 .retain(|h| h.frame_depth <= current_depth);
269 step_runtime::pop_and_record(
270 current_depth + 1,
271 "skipped",
272 Some(step_runtime_error_message(&error)),
273 );
274 if self.frames.is_empty() {
275 return StepBoundaryOutcome::Returned(VmValue::Nil);
276 }
277 self.iterators.truncate(popped.saved_iterator_depth);
278 self.env = popped.saved_env;
279 self.stack.truncate(popped.stack_base);
280 }
281 StepBoundaryOutcome::Returned(VmValue::Nil)
282 }
283 step_runtime::StepErrorBoundary::Escalate => {
284 let identity = step_runtime::with_active_step(|step| {
285 (
286 step.definition.name.clone(),
287 step.definition.function.clone(),
288 )
289 });
290 step_runtime::pop_and_record(
291 step_depth,
292 "escalated",
293 Some(step_runtime_error_message(&error)),
294 );
295 let (step_name, function) = identity.unzip();
296 StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
297 error,
298 step_name.as_deref(),
299 function.as_deref(),
300 ))
301 }
302 step_runtime::StepErrorBoundary::Fail => {
303 step_runtime::pop_and_record(
304 step_depth,
305 "failed",
306 Some(step_runtime_error_message(&error)),
307 );
308 StepBoundaryOutcome::Throw(error)
309 }
310 }
311 }
312}
313
314fn next_deadline(
315 scope_deadline: Option<Instant>,
316 interrupt_handler_deadline: Option<Instant>,
317) -> (Option<Instant>, Option<DeadlineKind>) {
318 match (scope_deadline, interrupt_handler_deadline) {
319 (Some(scope), Some(interrupt)) if interrupt < scope => {
320 (Some(interrupt), Some(DeadlineKind::InterruptHandler))
321 }
322 (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
323 (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
324 (None, None) => (None, None),
325 }
326}
327
328fn step_runtime_error_message(error: &VmError) -> String {
329 match error {
330 VmError::Thrown(VmValue::Dict(dict)) => dict
331 .get("message")
332 .map(|v| v.display())
333 .unwrap_or_else(|| error.to_string()),
334 _ => error.to_string(),
335 }
336}
337
338pub(crate) enum StepBoundaryOutcome {
339 Returned(VmValue),
340 Throw(VmError),
341}
342
343impl crate::vm::Vm {
344 pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
345 if let Some(err) = self.pending_scope_interrupt().await {
346 match self.handle_error(err) {
347 Ok(None) => return Ok(None),
348 Ok(Some(val)) => return Ok(Some((val, false))),
349 Err(e) => return Err(e),
350 }
351 }
352
353 let frame = match self.frames.last_mut() {
354 Some(f) => f,
355 None => {
356 let val = self.stack.pop().unwrap_or(VmValue::Nil);
357 return Ok(Some((val, false)));
358 }
359 };
360
361 if frame.ip >= frame.chunk.code.len() {
362 let val = self.stack.pop().unwrap_or(VmValue::Nil);
363 self.release_sync_guards_for_frame(self.frames.len());
364 let popped_frame = self.frames.pop().unwrap();
365 if self.frames.is_empty() {
366 return Ok(Some((val, false)));
367 } else {
368 self.iterators.truncate(popped_frame.saved_iterator_depth);
369 self.env = popped_frame.saved_env;
370 self.stack.truncate(popped_frame.stack_base);
371 self.stack.push(val);
372 return Ok(None);
373 }
374 }
375
376 let op = frame.chunk.code[frame.ip];
377 frame.ip += 1;
378
379 match self.execute_op_with_scope_interrupts(op).await {
380 Ok(Some(val)) => Ok(Some((val, false))),
381 Ok(None) => Ok(None),
382 Err(VmError::Return(val)) => {
383 if let Some(popped_frame) = self.frames.pop() {
384 self.release_sync_guards_for_frame(self.frames.len() + 1);
385 if let Some(ref dir) = popped_frame.saved_source_dir {
386 crate::stdlib::set_thread_source_dir(dir);
387 }
388 let current_depth = self.frames.len();
389 self.exception_handlers
390 .retain(|h| h.frame_depth <= current_depth);
391 if self.frames.is_empty() {
392 return Ok(Some((val, false)));
393 }
394 self.iterators.truncate(popped_frame.saved_iterator_depth);
395 self.env = popped_frame.saved_env;
396 self.stack.truncate(popped_frame.stack_base);
397 self.stack.push(val);
398 Ok(None)
399 } else {
400 Ok(Some((val, false)))
401 }
402 }
403 Err(e) => {
404 if self.error_stack_trace.is_empty() {
405 self.error_stack_trace = self.capture_stack_trace();
406 }
407 match self.handle_error(e) {
408 Ok(None) => {
409 self.error_stack_trace.clear();
410 Ok(None)
411 }
412 Ok(Some(val)) => Ok(Some((val, false))),
413 Err(e) => Err(self.enrich_error_with_line(e)),
414 }
415 }
416 }
417 }
418
419 async fn execute_op_with_scope_interrupts(
420 &mut self,
421 op: u8,
422 ) -> Result<Option<VmValue>, VmError> {
423 enum ScopeInterruptResult {
424 Op(Result<Option<VmValue>, VmError>),
425 Deadline(DeadlineKind),
426 CancelTimedOut,
427 }
428
429 let (deadline, deadline_kind) = next_deadline(
430 self.deadlines.last().map(|(deadline, _)| *deadline),
431 self.interrupt_handler_deadline,
432 );
433 let cancel_token = self.cancel_token.clone();
434
435 if deadline.is_none() && cancel_token.is_none() {
436 return self.execute_op(op).await;
437 }
438
439 let has_deadline = deadline.is_some();
440 let cancel_requested_at_start = cancel_token
441 .as_ref()
442 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
443 let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
444 let deadline_sleep = async move {
445 if let Some(deadline) = deadline {
446 tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
447 } else {
448 std::future::pending::<()>().await;
449 }
450 };
451 let cancel_sleep = async move {
452 if let Some(token) = cancel_token {
453 while !token.load(std::sync::atomic::Ordering::SeqCst) {
454 tokio::time::sleep(Duration::from_millis(10)).await;
455 }
456 } else {
457 std::future::pending::<()>().await;
458 }
459 };
460
461 let result = {
462 let op_future = self.execute_op(op);
463 tokio::pin!(op_future);
464 tokio::select! {
465 result = &mut op_future => ScopeInterruptResult::Op(result),
466 _ = deadline_sleep, if has_deadline => {
467 ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
468 },
469 _ = cancel_sleep, if has_cancel => {
470 let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
471 tokio::pin!(grace);
472 tokio::select! {
473 result = &mut op_future => ScopeInterruptResult::Op(result),
474 _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
475 }
476 }
477 }
478 };
479
480 match result {
481 ScopeInterruptResult::Op(result) => result,
482 ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
483 self.deadlines.pop();
484 self.cancel_spawned_tasks();
485 Err(Self::deadline_exceeded_error())
486 }
487 ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
488 Err(Self::interrupt_handler_timeout_error())
489 }
490 ScopeInterruptResult::CancelTimedOut => {
491 self.cancel_spawned_tasks();
492 let signal = self
493 .take_host_interrupt_signal()
494 .unwrap_or_else(|| "SIGINT".to_string());
495 if self.has_interrupt_handler_for(&signal) {
496 self.dispatch_interrupt_handlers(&signal).await?;
497 }
498 Err(Self::cancelled_error())
499 }
500 }
501 }
502
503 pub(crate) fn deadline_exceeded_error() -> VmError {
504 VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
505 }
506
507 pub(crate) fn cancelled_error() -> VmError {
508 VmError::Thrown(VmValue::String(Rc::from(
509 "kind:cancelled:VM cancelled by host",
510 )))
511 }
512
513 pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
515 self.frames
516 .iter()
517 .map(|f| {
518 let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
519 let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
520 let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
521 (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
522 })
523 .collect()
524 }
525
526 pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
530 let line = self
532 .error_stack_trace
533 .last()
534 .map(|(_, l, _, _)| *l)
535 .unwrap_or_else(|| self.current_line());
536 if line == 0 {
537 return error;
538 }
539 let suffix = format!(" (line {line})");
540 match error {
541 VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
542 VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
543 VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
544 VmError::UndefinedVariable(name) => {
545 VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
546 }
547 VmError::UndefinedBuiltin(name) => {
548 VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
549 }
550 VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
551 "Cannot assign to immutable binding: {name}{suffix}"
552 )),
553 VmError::StackOverflow => {
554 VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
555 }
556 other => other,
562 }
563 }
564}