1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13 Scope,
14 InterruptHandler,
15}
16
17impl Vm {
18 pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
20 let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
21 let result = self.run_chunk(chunk).await;
22 let result = match result {
23 Ok(value) => self.run_pipeline_finish_lifecycle(value).await,
24 Err(error) => {
25 crate::orchestration::clear_pipeline_on_finish();
26 Err(error)
27 }
28 };
29 crate::tracing::span_end(span_id);
30 result
31 }
32
33 async fn run_pipeline_finish_lifecycle(&mut self, value: VmValue) -> Result<VmValue, VmError> {
40 use crate::orchestration::{take_pipeline_on_finish, unsettled_state_snapshot, HookEvent};
41
42 let on_finish = take_pipeline_on_finish();
43 let unsettled = unsettled_state_snapshot();
44
45 let pre_payload = serde_json::json!({
46 "event": HookEvent::PreFinish.as_str(),
47 "return_value": crate::llm::vm_value_to_json(&value),
48 "unsettled": unsettled.to_json(),
49 "has_on_finish": on_finish.is_some(),
50 });
51 self.fire_finish_lifecycle_event(HookEvent::PreFinish, &pre_payload)
52 .await?;
53
54 if !unsettled.is_empty() {
55 let payload = serde_json::json!({
56 "event": HookEvent::OnUnsettledDetected.as_str(),
57 "unsettled": unsettled.to_json(),
58 });
59 self.fire_finish_lifecycle_event(HookEvent::OnUnsettledDetected, &payload)
60 .await?;
61 }
62
63 let final_value = if let Some(closure) = on_finish {
64 let harness_value = crate::harness::Harness::real().into_vm_value();
65 self.call_closure_pub(&closure, &[harness_value, value])
66 .await?
67 } else {
68 value
69 };
70
71 let post_payload = serde_json::json!({
72 "event": HookEvent::PostFinish.as_str(),
73 "return_value": crate::llm::vm_value_to_json(&final_value),
74 "unsettled": unsettled.to_json(),
75 });
76 self.fire_finish_lifecycle_event(HookEvent::PostFinish, &post_payload)
77 .await?;
78
79 Ok(final_value)
80 }
81
82 async fn fire_finish_lifecycle_event(
90 &mut self,
91 event: crate::orchestration::HookEvent,
92 payload: &serde_json::Value,
93 ) -> Result<(), VmError> {
94 let invocations = crate::orchestration::matching_vm_lifecycle_hooks(event, payload);
95 if invocations.is_empty() {
96 return Ok(());
97 }
98 let arg = crate::stdlib::json_to_vm_value(payload);
99 for invocation in invocations {
100 let raw = self
101 .call_closure_pub(&invocation.closure, &[arg.clone()])
102 .await?;
103 let (_action, effects) = crate::orchestration::collect_hook_effects_and_action(
104 event,
105 raw,
106 crate::value::VmValue::Nil,
107 )?;
108 crate::orchestration::inject_hook_effects_into_current_session(effects)?;
109 }
110 Ok(())
111 }
112
113 pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
115 let thrown_value = match &error {
116 VmError::Thrown(v) => v.clone(),
117 other => VmValue::String(Rc::from(other.to_string())),
118 };
119
120 if let Some(handler) = self.exception_handlers.pop() {
121 if !handler.error_type.is_empty() {
122 let matches = match &thrown_value {
124 VmValue::EnumVariant { enum_name, .. } => {
125 enum_name.as_ref() == handler.error_type
126 }
127 _ => false,
128 };
129 if !matches {
130 return self.handle_error(error);
131 }
132 }
133
134 self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
135
136 while self.frames.len() > handler.frame_depth {
137 if let Some(frame) = self.frames.pop() {
138 if let Some(ref dir) = frame.saved_source_dir {
139 crate::stdlib::set_thread_source_dir(dir);
140 }
141 self.iterators.truncate(frame.saved_iterator_depth);
142 self.env = frame.saved_env;
143 }
144 }
145 crate::step_runtime::prune_below_frame(self.frames.len());
146
147 while self
149 .deadlines
150 .last()
151 .is_some_and(|d| d.1 > handler.frame_depth)
152 {
153 self.deadlines.pop();
154 }
155
156 self.env.truncate_scopes(handler.env_scope_depth);
157
158 self.stack.truncate(handler.stack_depth);
159 self.stack.push(thrown_value);
160
161 if let Some(frame) = self.frames.last_mut() {
162 frame.ip = handler.catch_ip;
163 }
164
165 Ok(None)
166 } else {
167 Err(error)
168 }
169 }
170
171 pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
172 self.run_chunk_entry(chunk, 0, None, None, None, None).await
173 }
174
175 pub(crate) async fn run_chunk_entry(
176 &mut self,
177 chunk: &Chunk,
178 argc: usize,
179 saved_source_dir: Option<std::path::PathBuf>,
180 module_functions: Option<ModuleFunctionRegistry>,
181 module_state: Option<crate::value::ModuleState>,
182 local_slots: Option<Vec<LocalSlot>>,
183 ) -> Result<VmValue, VmError> {
184 self.run_chunk_ref(
185 Rc::new(chunk.clone()),
186 argc,
187 saved_source_dir,
188 module_functions,
189 module_state,
190 local_slots,
191 )
192 .await
193 }
194
195 pub(crate) async fn run_chunk_ref(
196 &mut self,
197 chunk: ChunkRef,
198 argc: usize,
199 saved_source_dir: Option<std::path::PathBuf>,
200 module_functions: Option<ModuleFunctionRegistry>,
201 module_state: Option<crate::value::ModuleState>,
202 local_slots: Option<Vec<LocalSlot>>,
203 ) -> Result<VmValue, VmError> {
204 let debugger = self.debugger_attached();
205 let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
206 let initial_env = if debugger {
207 Some(self.env.clone())
208 } else {
209 None
210 };
211 let initial_local_slots = if debugger {
212 Some(local_slots.clone())
213 } else {
214 None
215 };
216 self.frames.push(CallFrame {
217 chunk,
218 ip: 0,
219 stack_base: self.stack.len(),
220 saved_env: self.env.clone(),
221 initial_env,
222 initial_local_slots,
223 saved_iterator_depth: self.iterators.len(),
224 fn_name: String::new(),
225 argc,
226 saved_source_dir,
227 module_functions,
228 module_state,
229 local_slots,
230 local_scope_base: self.env.scope_depth().saturating_sub(1),
231 local_scope_depth: 0,
232 });
233
234 loop {
235 if let Some(err) = self.pending_scope_interrupt().await {
236 match self.handle_error(err) {
237 Ok(None) => continue,
238 Ok(Some(val)) => return Ok(val),
239 Err(e) => return Err(e),
240 }
241 }
242
243 let frame = match self.frames.last_mut() {
244 Some(f) => f,
245 None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
246 };
247
248 if frame.ip >= frame.chunk.code.len() {
249 let val = self.stack.pop().unwrap_or(VmValue::Nil);
250 let val = self.run_step_post_hooks_for_current_frame(val).await?;
251 self.release_sync_guards_for_frame(self.frames.len());
252 let popped_frame = self.frames.pop().unwrap();
253 if let Some(ref dir) = popped_frame.saved_source_dir {
254 crate::stdlib::set_thread_source_dir(dir);
255 }
256 crate::step_runtime::prune_below_frame(self.frames.len());
257
258 if self.frames.is_empty() {
259 return Ok(val);
260 } else {
261 self.iterators.truncate(popped_frame.saved_iterator_depth);
262 self.env = popped_frame.saved_env;
263 self.stack.truncate(popped_frame.stack_base);
264 self.stack.push(val);
265 continue;
266 }
267 }
268
269 let op = frame.chunk.code[frame.ip];
270 frame.ip += 1;
271
272 match self.execute_op_with_scope_interrupts(op).await {
273 Ok(Some(val)) => return Ok(val),
274 Ok(None) => continue,
275 Err(VmError::Return(val)) => {
276 let val = self.run_step_post_hooks_for_current_frame(val).await?;
277 if let Some(popped_frame) = self.frames.pop() {
278 self.release_sync_guards_for_frame(self.frames.len() + 1);
279 if let Some(ref dir) = popped_frame.saved_source_dir {
280 crate::stdlib::set_thread_source_dir(dir);
281 }
282 let current_depth = self.frames.len();
283 self.exception_handlers
284 .retain(|h| h.frame_depth <= current_depth);
285 crate::step_runtime::prune_below_frame(current_depth);
286
287 if self.frames.is_empty() {
288 return Ok(val);
289 }
290 self.iterators.truncate(popped_frame.saved_iterator_depth);
291 self.env = popped_frame.saved_env;
292 self.stack.truncate(popped_frame.stack_base);
293 self.stack.push(val);
294 } else {
295 return Ok(val);
296 }
297 }
298 Err(e) => {
299 if self.error_stack_trace.is_empty() {
301 self.error_stack_trace = self.capture_stack_trace();
302 }
303 let e = match self.apply_step_error_boundary(e) {
310 StepBoundaryOutcome::Returned(val) => {
311 self.error_stack_trace.clear();
312 self.stack.push(val);
313 continue;
314 }
315 StepBoundaryOutcome::Throw(err) => err,
316 };
317 match self.handle_error(e) {
318 Ok(None) => {
319 self.error_stack_trace.clear();
320 continue;
321 }
322 Ok(Some(val)) => return Ok(val),
323 Err(e) => return Err(self.enrich_error_with_line(e)),
324 }
325 }
326 }
327 }
328 }
329
330 pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
336 use crate::step_runtime;
337 if !step_runtime::is_step_budget_exhausted(&error) {
338 return StepBoundaryOutcome::Throw(error);
339 }
340 let Some(step_depth) = step_runtime::active_step_frame_depth() else {
341 return StepBoundaryOutcome::Throw(error);
342 };
343 if step_depth != self.frames.len() {
348 return StepBoundaryOutcome::Throw(error);
349 }
350 let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
351 .unwrap_or(step_runtime::StepErrorBoundary::Fail);
352 match boundary {
353 step_runtime::StepErrorBoundary::Continue => {
354 if let Some(popped) = self.frames.pop() {
358 self.release_sync_guards_for_frame(self.frames.len() + 1);
359 if let Some(ref dir) = popped.saved_source_dir {
360 crate::stdlib::set_thread_source_dir(dir);
361 }
362 let current_depth = self.frames.len();
363 self.exception_handlers
364 .retain(|h| h.frame_depth <= current_depth);
365 step_runtime::pop_and_record(
366 current_depth + 1,
367 "skipped",
368 Some(step_runtime_error_message(&error)),
369 );
370 if self.frames.is_empty() {
371 return StepBoundaryOutcome::Returned(VmValue::Nil);
372 }
373 self.iterators.truncate(popped.saved_iterator_depth);
374 self.env = popped.saved_env;
375 self.stack.truncate(popped.stack_base);
376 }
377 StepBoundaryOutcome::Returned(VmValue::Nil)
378 }
379 step_runtime::StepErrorBoundary::Escalate => {
380 let identity = step_runtime::with_active_step(|step| {
381 (
382 step.definition.name.clone(),
383 step.definition.function.clone(),
384 )
385 });
386 step_runtime::pop_and_record(
387 step_depth,
388 "escalated",
389 Some(step_runtime_error_message(&error)),
390 );
391 let (step_name, function) = identity.unzip();
392 StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
393 error,
394 step_name.as_deref(),
395 function.as_deref(),
396 ))
397 }
398 step_runtime::StepErrorBoundary::Fail => {
399 step_runtime::pop_and_record(
400 step_depth,
401 "failed",
402 Some(step_runtime_error_message(&error)),
403 );
404 StepBoundaryOutcome::Throw(error)
405 }
406 }
407 }
408}
409
410fn next_deadline(
411 scope_deadline: Option<Instant>,
412 interrupt_handler_deadline: Option<Instant>,
413) -> (Option<Instant>, Option<DeadlineKind>) {
414 match (scope_deadline, interrupt_handler_deadline) {
415 (Some(scope), Some(interrupt)) if interrupt < scope => {
416 (Some(interrupt), Some(DeadlineKind::InterruptHandler))
417 }
418 (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
419 (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
420 (None, None) => (None, None),
421 }
422}
423
424fn step_runtime_error_message(error: &VmError) -> String {
425 match error {
426 VmError::Thrown(VmValue::Dict(dict)) => dict
427 .get("message")
428 .map(|v| v.display())
429 .unwrap_or_else(|| error.to_string()),
430 _ => error.to_string(),
431 }
432}
433
434pub(crate) enum StepBoundaryOutcome {
435 Returned(VmValue),
436 Throw(VmError),
437}
438
439impl crate::vm::Vm {
440 pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
441 if let Some(err) = self.pending_scope_interrupt().await {
442 match self.handle_error(err) {
443 Ok(None) => return Ok(None),
444 Ok(Some(val)) => return Ok(Some((val, false))),
445 Err(e) => return Err(e),
446 }
447 }
448
449 let frame = match self.frames.last_mut() {
450 Some(f) => f,
451 None => {
452 let val = self.stack.pop().unwrap_or(VmValue::Nil);
453 return Ok(Some((val, false)));
454 }
455 };
456
457 if frame.ip >= frame.chunk.code.len() {
458 let val = self.stack.pop().unwrap_or(VmValue::Nil);
459 self.release_sync_guards_for_frame(self.frames.len());
460 let popped_frame = self.frames.pop().unwrap();
461 if self.frames.is_empty() {
462 return Ok(Some((val, false)));
463 } else {
464 self.iterators.truncate(popped_frame.saved_iterator_depth);
465 self.env = popped_frame.saved_env;
466 self.stack.truncate(popped_frame.stack_base);
467 self.stack.push(val);
468 return Ok(None);
469 }
470 }
471
472 let op = frame.chunk.code[frame.ip];
473 frame.ip += 1;
474
475 match self.execute_op_with_scope_interrupts(op).await {
476 Ok(Some(val)) => Ok(Some((val, false))),
477 Ok(None) => Ok(None),
478 Err(VmError::Return(val)) => {
479 if let Some(popped_frame) = self.frames.pop() {
480 self.release_sync_guards_for_frame(self.frames.len() + 1);
481 if let Some(ref dir) = popped_frame.saved_source_dir {
482 crate::stdlib::set_thread_source_dir(dir);
483 }
484 let current_depth = self.frames.len();
485 self.exception_handlers
486 .retain(|h| h.frame_depth <= current_depth);
487 if self.frames.is_empty() {
488 return Ok(Some((val, false)));
489 }
490 self.iterators.truncate(popped_frame.saved_iterator_depth);
491 self.env = popped_frame.saved_env;
492 self.stack.truncate(popped_frame.stack_base);
493 self.stack.push(val);
494 Ok(None)
495 } else {
496 Ok(Some((val, false)))
497 }
498 }
499 Err(e) => {
500 if self.error_stack_trace.is_empty() {
501 self.error_stack_trace = self.capture_stack_trace();
502 }
503 match self.handle_error(e) {
504 Ok(None) => {
505 self.error_stack_trace.clear();
506 Ok(None)
507 }
508 Ok(Some(val)) => Ok(Some((val, false))),
509 Err(e) => Err(self.enrich_error_with_line(e)),
510 }
511 }
512 }
513 }
514
515 async fn execute_op_with_scope_interrupts(
516 &mut self,
517 op: u8,
518 ) -> Result<Option<VmValue>, VmError> {
519 enum ScopeInterruptResult {
520 Op(Result<Option<VmValue>, VmError>),
521 Deadline(DeadlineKind),
522 CancelTimedOut,
523 }
524
525 let (deadline, deadline_kind) = next_deadline(
526 self.deadlines.last().map(|(deadline, _)| *deadline),
527 self.interrupt_handler_deadline,
528 );
529 let cancel_token = self.cancel_token.clone();
530
531 if deadline.is_none() && cancel_token.is_none() {
532 return self.execute_op(op).await;
533 }
534
535 let has_deadline = deadline.is_some();
536 let cancel_requested_at_start = cancel_token
537 .as_ref()
538 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
539 let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
540 let deadline_sleep = async move {
541 if let Some(deadline) = deadline {
542 tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
543 } else {
544 std::future::pending::<()>().await;
545 }
546 };
547 let cancel_sleep = async move {
548 if let Some(token) = cancel_token {
549 while !token.load(std::sync::atomic::Ordering::SeqCst) {
550 tokio::time::sleep(Duration::from_millis(10)).await;
551 }
552 } else {
553 std::future::pending::<()>().await;
554 }
555 };
556
557 let result = {
558 let op_future = self.execute_op(op);
559 tokio::pin!(op_future);
560 tokio::select! {
561 result = &mut op_future => ScopeInterruptResult::Op(result),
562 _ = deadline_sleep, if has_deadline => {
563 ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
564 },
565 _ = cancel_sleep, if has_cancel => {
566 let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
567 tokio::pin!(grace);
568 tokio::select! {
569 result = &mut op_future => ScopeInterruptResult::Op(result),
570 _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
571 }
572 }
573 }
574 };
575
576 match result {
577 ScopeInterruptResult::Op(result) => result,
578 ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
579 self.deadlines.pop();
580 self.cancel_spawned_tasks();
581 Err(Self::deadline_exceeded_error())
582 }
583 ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
584 Err(Self::interrupt_handler_timeout_error())
585 }
586 ScopeInterruptResult::CancelTimedOut => {
587 self.cancel_spawned_tasks();
588 let signal = self
589 .take_host_interrupt_signal()
590 .unwrap_or_else(|| "SIGINT".to_string());
591 if self.has_interrupt_handler_for(&signal) {
592 self.dispatch_interrupt_handlers(&signal).await?;
593 }
594 Err(Self::cancelled_error())
595 }
596 }
597 }
598
599 pub(crate) fn deadline_exceeded_error() -> VmError {
600 VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
601 }
602
603 pub(crate) fn cancelled_error() -> VmError {
604 VmError::Thrown(VmValue::String(Rc::from(
605 "kind:cancelled:VM cancelled by host",
606 )))
607 }
608
609 pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
611 self.frames
612 .iter()
613 .map(|f| {
614 let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
615 let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
616 let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
617 (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
618 })
619 .collect()
620 }
621
622 pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
626 let line = self
628 .error_stack_trace
629 .last()
630 .map(|(_, l, _, _)| *l)
631 .unwrap_or_else(|| self.current_line());
632 if line == 0 {
633 return error;
634 }
635 let suffix = format!(" (line {line})");
636 match error {
637 VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
638 VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
639 VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
640 VmError::UndefinedVariable(name) => {
641 VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
642 }
643 VmError::UndefinedBuiltin(name) => {
644 VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
645 }
646 VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
647 "Cannot assign to immutable binding: {name}{suffix}"
648 )),
649 VmError::StackOverflow => {
650 VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
651 }
652 other => other,
658 }
659 }
660}