1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_INSTRUCTIONS: usize = 1024;
10const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
11
12impl Vm {
13 pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
15 let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
16 let result = self.run_chunk(chunk).await;
17 crate::tracing::span_end(span_id);
18 result
19 }
20
21 pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
23 let thrown_value = match &error {
24 VmError::Thrown(v) => v.clone(),
25 other => VmValue::String(Rc::from(other.to_string())),
26 };
27
28 if let Some(handler) = self.exception_handlers.pop() {
29 if !handler.error_type.is_empty() {
30 let matches = match &thrown_value {
32 VmValue::EnumVariant { enum_name, .. } => {
33 enum_name.as_ref() == handler.error_type
34 }
35 _ => false,
36 };
37 if !matches {
38 return self.handle_error(error);
39 }
40 }
41
42 self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
43
44 while self.frames.len() > handler.frame_depth {
45 if let Some(frame) = self.frames.pop() {
46 if let Some(ref dir) = frame.saved_source_dir {
47 crate::stdlib::set_thread_source_dir(dir);
48 }
49 self.iterators.truncate(frame.saved_iterator_depth);
50 self.env = frame.saved_env;
51 }
52 }
53 crate::step_runtime::prune_below_frame(self.frames.len());
54
55 while self
57 .deadlines
58 .last()
59 .is_some_and(|d| d.1 > handler.frame_depth)
60 {
61 self.deadlines.pop();
62 }
63
64 self.env.truncate_scopes(handler.env_scope_depth);
65
66 self.stack.truncate(handler.stack_depth);
67 self.stack.push(thrown_value);
68
69 if let Some(frame) = self.frames.last_mut() {
70 frame.ip = handler.catch_ip;
71 }
72
73 Ok(None)
74 } else {
75 Err(error)
76 }
77 }
78
79 pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
80 self.run_chunk_entry(chunk, 0, None, None, None, None).await
81 }
82
83 pub(crate) async fn run_chunk_entry(
84 &mut self,
85 chunk: &Chunk,
86 argc: usize,
87 saved_source_dir: Option<std::path::PathBuf>,
88 module_functions: Option<ModuleFunctionRegistry>,
89 module_state: Option<crate::value::ModuleState>,
90 local_slots: Option<Vec<LocalSlot>>,
91 ) -> Result<VmValue, VmError> {
92 self.run_chunk_ref(
93 Rc::new(chunk.clone()),
94 argc,
95 saved_source_dir,
96 module_functions,
97 module_state,
98 local_slots,
99 )
100 .await
101 }
102
103 pub(crate) async fn run_chunk_ref(
104 &mut self,
105 chunk: ChunkRef,
106 argc: usize,
107 saved_source_dir: Option<std::path::PathBuf>,
108 module_functions: Option<ModuleFunctionRegistry>,
109 module_state: Option<crate::value::ModuleState>,
110 local_slots: Option<Vec<LocalSlot>>,
111 ) -> Result<VmValue, VmError> {
112 let initial_env = self.env.clone();
113 let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
114 let initial_local_slots = local_slots.clone();
115 self.frames.push(CallFrame {
116 chunk,
117 ip: 0,
118 stack_base: self.stack.len(),
119 saved_env: self.env.clone(),
120 initial_env: Some(initial_env),
121 initial_local_slots: Some(initial_local_slots),
122 saved_iterator_depth: self.iterators.len(),
123 fn_name: String::new(),
124 argc,
125 saved_source_dir,
126 module_functions,
127 module_state,
128 local_slots,
129 local_scope_base: self.env.scope_depth().saturating_sub(1),
130 local_scope_depth: 0,
131 });
132
133 loop {
134 if let Some(err) = self.pending_scope_interrupt() {
135 match self.handle_error(err) {
136 Ok(None) => continue,
137 Ok(Some(val)) => return Ok(val),
138 Err(e) => return Err(e),
139 }
140 }
141
142 let frame = match self.frames.last_mut() {
143 Some(f) => f,
144 None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
145 };
146
147 if frame.ip >= frame.chunk.code.len() {
148 let val = self.stack.pop().unwrap_or(VmValue::Nil);
149 let val = self.run_step_post_hooks_for_current_frame(val).await?;
150 self.release_sync_guards_for_frame(self.frames.len());
151 let popped_frame = self.frames.pop().unwrap();
152 if let Some(ref dir) = popped_frame.saved_source_dir {
153 crate::stdlib::set_thread_source_dir(dir);
154 }
155 crate::step_runtime::prune_below_frame(self.frames.len());
156
157 if self.frames.is_empty() {
158 return Ok(val);
159 } else {
160 self.iterators.truncate(popped_frame.saved_iterator_depth);
161 self.env = popped_frame.saved_env;
162 self.stack.truncate(popped_frame.stack_base);
163 self.stack.push(val);
164 continue;
165 }
166 }
167
168 let op = frame.chunk.code[frame.ip];
169 frame.ip += 1;
170
171 match self.execute_op_with_scope_interrupts(op).await {
172 Ok(Some(val)) => return Ok(val),
173 Ok(None) => continue,
174 Err(VmError::Return(val)) => {
175 let val = self.run_step_post_hooks_for_current_frame(val).await?;
176 if let Some(popped_frame) = self.frames.pop() {
177 self.release_sync_guards_for_frame(self.frames.len() + 1);
178 if let Some(ref dir) = popped_frame.saved_source_dir {
179 crate::stdlib::set_thread_source_dir(dir);
180 }
181 let current_depth = self.frames.len();
182 self.exception_handlers
183 .retain(|h| h.frame_depth <= current_depth);
184 crate::step_runtime::prune_below_frame(current_depth);
185
186 if self.frames.is_empty() {
187 return Ok(val);
188 }
189 self.iterators.truncate(popped_frame.saved_iterator_depth);
190 self.env = popped_frame.saved_env;
191 self.stack.truncate(popped_frame.stack_base);
192 self.stack.push(val);
193 } else {
194 return Ok(val);
195 }
196 }
197 Err(e) => {
198 if self.error_stack_trace.is_empty() {
200 self.error_stack_trace = self.capture_stack_trace();
201 }
202 let e = match self.apply_step_error_boundary(e) {
209 StepBoundaryOutcome::Returned(val) => {
210 self.error_stack_trace.clear();
211 self.stack.push(val);
212 continue;
213 }
214 StepBoundaryOutcome::Throw(err) => err,
215 };
216 match self.handle_error(e) {
217 Ok(None) => {
218 self.error_stack_trace.clear();
219 continue;
220 }
221 Ok(Some(val)) => return Ok(val),
222 Err(e) => return Err(self.enrich_error_with_line(e)),
223 }
224 }
225 }
226 }
227 }
228
229 pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
235 use crate::step_runtime;
236 if !step_runtime::is_step_budget_exhausted(&error) {
237 return StepBoundaryOutcome::Throw(error);
238 }
239 let Some(step_depth) = step_runtime::active_step_frame_depth() else {
240 return StepBoundaryOutcome::Throw(error);
241 };
242 if step_depth != self.frames.len() {
247 return StepBoundaryOutcome::Throw(error);
248 }
249 let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
250 .unwrap_or(step_runtime::StepErrorBoundary::Fail);
251 match boundary {
252 step_runtime::StepErrorBoundary::Continue => {
253 if let Some(popped) = self.frames.pop() {
257 self.release_sync_guards_for_frame(self.frames.len() + 1);
258 if let Some(ref dir) = popped.saved_source_dir {
259 crate::stdlib::set_thread_source_dir(dir);
260 }
261 let current_depth = self.frames.len();
262 self.exception_handlers
263 .retain(|h| h.frame_depth <= current_depth);
264 step_runtime::pop_and_record(
265 current_depth + 1,
266 "skipped",
267 Some(step_runtime_error_message(&error)),
268 );
269 if self.frames.is_empty() {
270 return StepBoundaryOutcome::Returned(VmValue::Nil);
271 }
272 self.iterators.truncate(popped.saved_iterator_depth);
273 self.env = popped.saved_env;
274 self.stack.truncate(popped.stack_base);
275 }
276 StepBoundaryOutcome::Returned(VmValue::Nil)
277 }
278 step_runtime::StepErrorBoundary::Escalate => {
279 let identity = step_runtime::with_active_step(|step| {
280 (
281 step.definition.name.clone(),
282 step.definition.function.clone(),
283 )
284 });
285 step_runtime::pop_and_record(
286 step_depth,
287 "escalated",
288 Some(step_runtime_error_message(&error)),
289 );
290 let (step_name, function) = identity.unzip();
291 StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
292 error,
293 step_name.as_deref(),
294 function.as_deref(),
295 ))
296 }
297 step_runtime::StepErrorBoundary::Fail => {
298 step_runtime::pop_and_record(
299 step_depth,
300 "failed",
301 Some(step_runtime_error_message(&error)),
302 );
303 StepBoundaryOutcome::Throw(error)
304 }
305 }
306 }
307}
308
309fn step_runtime_error_message(error: &VmError) -> String {
310 match error {
311 VmError::Thrown(VmValue::Dict(dict)) => dict
312 .get("message")
313 .map(|v| v.display())
314 .unwrap_or_else(|| error.to_string()),
315 _ => error.to_string(),
316 }
317}
318
319pub(crate) enum StepBoundaryOutcome {
320 Returned(VmValue),
321 Throw(VmError),
322}
323
324impl crate::vm::Vm {
325 pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
326 if let Some(err) = self.pending_scope_interrupt() {
327 match self.handle_error(err) {
328 Ok(None) => return Ok(None),
329 Ok(Some(val)) => return Ok(Some((val, false))),
330 Err(e) => return Err(e),
331 }
332 }
333
334 let frame = match self.frames.last_mut() {
335 Some(f) => f,
336 None => {
337 let val = self.stack.pop().unwrap_or(VmValue::Nil);
338 return Ok(Some((val, false)));
339 }
340 };
341
342 if frame.ip >= frame.chunk.code.len() {
343 let val = self.stack.pop().unwrap_or(VmValue::Nil);
344 self.release_sync_guards_for_frame(self.frames.len());
345 let popped_frame = self.frames.pop().unwrap();
346 if self.frames.is_empty() {
347 return Ok(Some((val, false)));
348 } else {
349 self.iterators.truncate(popped_frame.saved_iterator_depth);
350 self.env = popped_frame.saved_env;
351 self.stack.truncate(popped_frame.stack_base);
352 self.stack.push(val);
353 return Ok(None);
354 }
355 }
356
357 let op = frame.chunk.code[frame.ip];
358 frame.ip += 1;
359
360 match self.execute_op_with_scope_interrupts(op).await {
361 Ok(Some(val)) => Ok(Some((val, false))),
362 Ok(None) => Ok(None),
363 Err(VmError::Return(val)) => {
364 if let Some(popped_frame) = self.frames.pop() {
365 self.release_sync_guards_for_frame(self.frames.len() + 1);
366 if let Some(ref dir) = popped_frame.saved_source_dir {
367 crate::stdlib::set_thread_source_dir(dir);
368 }
369 let current_depth = self.frames.len();
370 self.exception_handlers
371 .retain(|h| h.frame_depth <= current_depth);
372 if self.frames.is_empty() {
373 return Ok(Some((val, false)));
374 }
375 self.iterators.truncate(popped_frame.saved_iterator_depth);
376 self.env = popped_frame.saved_env;
377 self.stack.truncate(popped_frame.stack_base);
378 self.stack.push(val);
379 Ok(None)
380 } else {
381 Ok(Some((val, false)))
382 }
383 }
384 Err(e) => {
385 if self.error_stack_trace.is_empty() {
386 self.error_stack_trace = self.capture_stack_trace();
387 }
388 match self.handle_error(e) {
389 Ok(None) => {
390 self.error_stack_trace.clear();
391 Ok(None)
392 }
393 Ok(Some(val)) => Ok(Some((val, false))),
394 Err(e) => Err(self.enrich_error_with_line(e)),
395 }
396 }
397 }
398 }
399
400 fn pending_scope_interrupt(&mut self) -> Option<VmError> {
401 if self.is_cancel_requested() {
402 match self.cancel_grace_instructions_remaining.as_mut() {
403 Some(0) => {
404 self.cancel_spawned_tasks();
405 return Some(Self::cancelled_error());
406 }
407 Some(remaining) => *remaining -= 1,
408 None => self.cancel_grace_instructions_remaining = Some(CANCEL_GRACE_INSTRUCTIONS),
409 }
410 } else {
411 self.cancel_grace_instructions_remaining = None;
412 }
413 if let Some(&(deadline, _)) = self.deadlines.last() {
414 if Instant::now() >= deadline {
415 self.deadlines.pop();
416 return Some(Self::deadline_exceeded_error());
417 }
418 }
419 None
420 }
421
422 async fn execute_op_with_scope_interrupts(
423 &mut self,
424 op: u8,
425 ) -> Result<Option<VmValue>, VmError> {
426 enum ScopeInterruptResult {
427 Op(Result<Option<VmValue>, VmError>),
428 Deadline,
429 CancelTimedOut,
430 }
431
432 let deadline = self.deadlines.last().map(|(deadline, _)| *deadline);
433 let cancel_token = self.cancel_token.clone();
434
435 if deadline.is_none() && cancel_token.is_none() {
436 return self.execute_op(op).await;
437 }
438
439 let has_deadline = deadline.is_some();
440 let cancel_requested_at_start = cancel_token
441 .as_ref()
442 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
443 let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
444 let deadline_sleep = async move {
445 if let Some(deadline) = deadline {
446 tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
447 } else {
448 std::future::pending::<()>().await;
449 }
450 };
451 let cancel_sleep = async move {
452 if let Some(token) = cancel_token {
453 while !token.load(std::sync::atomic::Ordering::SeqCst) {
454 tokio::time::sleep(Duration::from_millis(10)).await;
455 }
456 } else {
457 std::future::pending::<()>().await;
458 }
459 };
460
461 let result = {
462 let op_future = self.execute_op(op);
463 tokio::pin!(op_future);
464 tokio::select! {
465 result = &mut op_future => ScopeInterruptResult::Op(result),
466 _ = deadline_sleep, if has_deadline => ScopeInterruptResult::Deadline,
467 _ = cancel_sleep, if has_cancel => {
468 let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
469 tokio::pin!(grace);
470 tokio::select! {
471 result = &mut op_future => ScopeInterruptResult::Op(result),
472 _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
473 }
474 }
475 }
476 };
477
478 match result {
479 ScopeInterruptResult::Op(result) => result,
480 ScopeInterruptResult::Deadline => {
481 self.deadlines.pop();
482 self.cancel_spawned_tasks();
483 Err(Self::deadline_exceeded_error())
484 }
485 ScopeInterruptResult::CancelTimedOut => {
486 self.cancel_spawned_tasks();
487 Err(Self::cancelled_error())
488 }
489 }
490 }
491
492 pub(crate) fn deadline_exceeded_error() -> VmError {
493 VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
494 }
495
496 pub(crate) fn cancelled_error() -> VmError {
497 VmError::Thrown(VmValue::String(Rc::from(
498 "kind:cancelled:VM cancelled by host",
499 )))
500 }
501
502 pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
504 self.frames
505 .iter()
506 .map(|f| {
507 let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
508 let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
509 let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
510 (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
511 })
512 .collect()
513 }
514
515 pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
519 let line = self
521 .error_stack_trace
522 .last()
523 .map(|(_, l, _, _)| *l)
524 .unwrap_or_else(|| self.current_line());
525 if line == 0 {
526 return error;
527 }
528 let suffix = format!(" (line {line})");
529 match error {
530 VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
531 VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
532 VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
533 VmError::UndefinedVariable(name) => {
534 VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
535 }
536 VmError::UndefinedBuiltin(name) => {
537 VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
538 }
539 VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
540 "Cannot assign to immutable binding: {name}{suffix}"
541 )),
542 VmError::StackOverflow => {
543 VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
544 }
545 other => other,
551 }
552 }
553}