1use std::rc::Rc;
2use std::time::{Duration, Instant};
3
4use crate::chunk::{Chunk, ChunkRef};
5use crate::value::{ModuleFunctionRegistry, VmError, VmValue};
6
7use super::{CallFrame, LocalSlot, Vm};
8
9const CANCEL_GRACE_ASYNC_OP: Duration = Duration::from_millis(250);
10
11#[derive(Clone, Copy)]
12enum DeadlineKind {
13 Scope,
14 InterruptHandler,
15}
16
17impl Vm {
18 pub async fn execute(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
20 let span_id = crate::tracing::span_start(crate::tracing::SpanKind::Pipeline, "main".into());
21 let result = self.run_chunk(chunk).await;
22 let result = match result {
23 Ok(value) => self.run_pipeline_finish_lifecycle(value).await,
24 Err(error) => {
25 crate::orchestration::clear_pipeline_on_finish();
26 Err(error)
27 }
28 };
29 crate::tracing::span_end(span_id);
30 result
31 }
32
33 async fn run_pipeline_finish_lifecycle(&mut self, value: VmValue) -> Result<VmValue, VmError> {
40 use crate::orchestration::{
41 take_pipeline_on_finish, unsettled_state_snapshot_async, HookEvent,
42 };
43 let _tape_phase =
44 crate::testbench::tape::enter_phase(crate::testbench::tape::TapePhase::RuntimeFinalize);
45
46 let on_finish = take_pipeline_on_finish();
47 let unsettled = unsettled_state_snapshot_async().await;
48
49 let pre_payload = serde_json::json!({
50 "event": HookEvent::PreFinish.as_str(),
51 "return_value": crate::llm::vm_value_to_json(&value),
52 "unsettled": unsettled.to_json(),
53 "has_on_finish": on_finish.is_some(),
54 });
55 self.fire_finish_lifecycle_event(HookEvent::PreFinish, &pre_payload)
56 .await?;
57
58 if !unsettled.is_empty() {
59 let payload = serde_json::json!({
60 "event": HookEvent::OnUnsettledDetected.as_str(),
61 "unsettled": unsettled.to_json(),
62 });
63 self.fire_finish_lifecycle_event(HookEvent::OnUnsettledDetected, &payload)
64 .await?;
65 }
66
67 let final_value = if let Some(closure) = on_finish {
68 let harness_value = crate::harness::Harness::real().into_vm_value();
69 self.call_closure_pub(&closure, &[harness_value, value])
70 .await?
71 } else {
72 value
73 };
74
75 let post_payload = serde_json::json!({
76 "event": HookEvent::PostFinish.as_str(),
77 "return_value": crate::llm::vm_value_to_json(&final_value),
78 "unsettled": unsettled.to_json(),
79 });
80 self.fire_finish_lifecycle_event(HookEvent::PostFinish, &post_payload)
81 .await?;
82
83 Ok(final_value)
84 }
85
86 async fn fire_finish_lifecycle_event(
104 &mut self,
105 event: crate::orchestration::HookEvent,
106 payload: &serde_json::Value,
107 ) -> Result<(), VmError> {
108 use crate::orchestration::{HookControl, HookEvent};
109 let invocations = crate::orchestration::matching_vm_lifecycle_hooks(event, payload);
110 if invocations.is_empty() {
111 return Ok(());
112 }
113 let mut current_payload = payload.clone();
114 for invocation in invocations {
115 let arg = crate::stdlib::json_to_vm_value(¤t_payload);
116 let raw = self.call_closure_pub(&invocation.closure, &[arg]).await?;
117 let (action, effects) = crate::orchestration::collect_hook_effects_and_action(
118 event,
119 raw,
120 crate::value::VmValue::Nil,
121 )?;
122 crate::orchestration::inject_hook_effects_into_current_session(effects)?;
123 let control = crate::orchestration::parse_hook_control_for_finish(event, &action)?;
124 match control {
125 HookControl::Allow => {}
126 HookControl::Block { reason } => {
127 if matches!(event, HookEvent::PreFinish) {
128 return Err(VmError::Runtime(format!(
129 "PreFinish hook returned block, which is not a valid control: {reason}. \
130 To delay pipeline finish until unsettled work clears, use \
131 OnFinish.block_until_settled (std/lifecycle) or return Modify/Allow \
132 from PreFinish."
133 )));
134 }
135 if matches!(event, HookEvent::PostFinish) {
136 continue;
138 }
139 return Err(VmError::Runtime(format!(
141 "{} hook blocked pipeline finish: {reason}",
142 event.as_str()
143 )));
144 }
145 HookControl::Modify { payload: modified } => {
146 current_payload = modified;
147 }
148 HookControl::Decision { .. } => {}
149 }
150 }
151 Ok(())
152 }
153
154 pub(crate) fn handle_error(&mut self, error: VmError) -> Result<Option<VmValue>, VmError> {
156 let thrown_value = match &error {
157 VmError::Thrown(v) => v.clone(),
158 other => VmValue::String(Rc::from(other.to_string())),
159 };
160
161 if let Some(handler) = self.exception_handlers.pop() {
162 if !handler.error_type.is_empty() {
163 let matches = match &thrown_value {
165 VmValue::EnumVariant { enum_name, .. } => {
166 enum_name.as_ref() == handler.error_type
167 }
168 _ => false,
169 };
170 if !matches {
171 return self.handle_error(error);
172 }
173 }
174
175 self.release_sync_guards_after_unwind(handler.frame_depth, handler.env_scope_depth);
176
177 while self.frames.len() > handler.frame_depth {
178 if let Some(frame) = self.frames.pop() {
179 if let Some(ref dir) = frame.saved_source_dir {
180 crate::stdlib::set_thread_source_dir(dir);
181 }
182 self.iterators.truncate(frame.saved_iterator_depth);
183 self.env = frame.saved_env;
184 }
185 }
186 crate::step_runtime::prune_below_frame(self.frames.len());
187
188 while self
190 .deadlines
191 .last()
192 .is_some_and(|d| d.1 > handler.frame_depth)
193 {
194 self.deadlines.pop();
195 }
196
197 self.env.truncate_scopes(handler.env_scope_depth);
198
199 self.stack.truncate(handler.stack_depth);
200 self.stack.push(thrown_value);
201
202 if let Some(frame) = self.frames.last_mut() {
203 frame.ip = handler.catch_ip;
204 }
205
206 Ok(None)
207 } else {
208 Err(error)
209 }
210 }
211
212 pub(crate) async fn run_chunk(&mut self, chunk: &Chunk) -> Result<VmValue, VmError> {
213 self.run_chunk_entry(chunk, 0, None, None, None, None).await
214 }
215
216 pub(crate) async fn run_chunk_entry(
217 &mut self,
218 chunk: &Chunk,
219 argc: usize,
220 saved_source_dir: Option<std::path::PathBuf>,
221 module_functions: Option<ModuleFunctionRegistry>,
222 module_state: Option<crate::value::ModuleState>,
223 local_slots: Option<Vec<LocalSlot>>,
224 ) -> Result<VmValue, VmError> {
225 self.run_chunk_ref(
226 Rc::new(chunk.clone()),
227 argc,
228 saved_source_dir,
229 module_functions,
230 module_state,
231 local_slots,
232 )
233 .await
234 }
235
236 pub(crate) async fn run_chunk_ref(
237 &mut self,
238 chunk: ChunkRef,
239 argc: usize,
240 saved_source_dir: Option<std::path::PathBuf>,
241 module_functions: Option<ModuleFunctionRegistry>,
242 module_state: Option<crate::value::ModuleState>,
243 local_slots: Option<Vec<LocalSlot>>,
244 ) -> Result<VmValue, VmError> {
245 let debugger = self.debugger_attached();
246 let local_slots = local_slots.unwrap_or_else(|| Self::fresh_local_slots(&chunk));
247 let initial_env = if debugger {
248 Some(self.env.clone())
249 } else {
250 None
251 };
252 let initial_local_slots = if debugger {
253 Some(local_slots.clone())
254 } else {
255 None
256 };
257 self.frames.push(CallFrame {
258 chunk,
259 ip: 0,
260 stack_base: self.stack.len(),
261 saved_env: self.env.clone(),
262 initial_env,
263 initial_local_slots,
264 saved_iterator_depth: self.iterators.len(),
265 fn_name: String::new(),
266 argc,
267 saved_source_dir,
268 module_functions,
269 module_state,
270 local_slots,
271 local_scope_base: self.env.scope_depth().saturating_sub(1),
272 local_scope_depth: 0,
273 });
274
275 loop {
276 if let Some(err) = self.pending_scope_interrupt().await {
277 match self.handle_error(err) {
278 Ok(None) => continue,
279 Ok(Some(val)) => return Ok(val),
280 Err(e) => return Err(e),
281 }
282 }
283
284 let frame = match self.frames.last_mut() {
285 Some(f) => f,
286 None => return Ok(self.stack.pop().unwrap_or(VmValue::Nil)),
287 };
288
289 if frame.ip >= frame.chunk.code.len() {
290 let val = self.stack.pop().unwrap_or(VmValue::Nil);
291 let val = self.run_step_post_hooks_for_current_frame(val).await?;
292 self.release_sync_guards_for_frame(self.frames.len());
293 let popped_frame = self.frames.pop().unwrap();
294 if let Some(ref dir) = popped_frame.saved_source_dir {
295 crate::stdlib::set_thread_source_dir(dir);
296 }
297 crate::step_runtime::prune_below_frame(self.frames.len());
298
299 if self.frames.is_empty() {
300 return Ok(val);
301 } else {
302 self.iterators.truncate(popped_frame.saved_iterator_depth);
303 self.env = popped_frame.saved_env;
304 self.stack.truncate(popped_frame.stack_base);
305 self.stack.push(val);
306 continue;
307 }
308 }
309
310 let op = frame.chunk.code[frame.ip];
311 frame.ip += 1;
312
313 match self.execute_op_with_scope_interrupts(op).await {
314 Ok(Some(val)) => return Ok(val),
315 Ok(None) => continue,
316 Err(VmError::Return(val)) => {
317 let val = self.run_step_post_hooks_for_current_frame(val).await?;
318 if let Some(popped_frame) = self.frames.pop() {
319 self.release_sync_guards_for_frame(self.frames.len() + 1);
320 if let Some(ref dir) = popped_frame.saved_source_dir {
321 crate::stdlib::set_thread_source_dir(dir);
322 }
323 let current_depth = self.frames.len();
324 self.exception_handlers
325 .retain(|h| h.frame_depth <= current_depth);
326 crate::step_runtime::prune_below_frame(current_depth);
327
328 if self.frames.is_empty() {
329 return Ok(val);
330 }
331 self.iterators.truncate(popped_frame.saved_iterator_depth);
332 self.env = popped_frame.saved_env;
333 self.stack.truncate(popped_frame.stack_base);
334 self.stack.push(val);
335 } else {
336 return Ok(val);
337 }
338 }
339 Err(e) => {
340 if self.error_stack_trace.is_empty() {
342 self.error_stack_trace = self.capture_stack_trace();
343 }
344 let e = match self.apply_step_error_boundary(e) {
351 StepBoundaryOutcome::Returned(val) => {
352 self.error_stack_trace.clear();
353 self.stack.push(val);
354 continue;
355 }
356 StepBoundaryOutcome::Throw(err) => err,
357 };
358 match self.handle_error(e) {
359 Ok(None) => {
360 self.error_stack_trace.clear();
361 continue;
362 }
363 Ok(Some(val)) => return Ok(val),
364 Err(e) => return Err(self.enrich_error_with_line(e)),
365 }
366 }
367 }
368 }
369 }
370
371 pub(crate) fn apply_step_error_boundary(&mut self, error: VmError) -> StepBoundaryOutcome {
377 use crate::step_runtime;
378 if !step_runtime::is_step_budget_exhausted(&error) {
379 return StepBoundaryOutcome::Throw(error);
380 }
381 let Some(step_depth) = step_runtime::active_step_frame_depth() else {
382 return StepBoundaryOutcome::Throw(error);
383 };
384 if step_depth != self.frames.len() {
389 return StepBoundaryOutcome::Throw(error);
390 }
391 let boundary = step_runtime::with_active_step(|step| step.definition.boundary())
392 .unwrap_or(step_runtime::StepErrorBoundary::Fail);
393 match boundary {
394 step_runtime::StepErrorBoundary::Continue => {
395 if let Some(popped) = self.frames.pop() {
399 self.release_sync_guards_for_frame(self.frames.len() + 1);
400 if let Some(ref dir) = popped.saved_source_dir {
401 crate::stdlib::set_thread_source_dir(dir);
402 }
403 let current_depth = self.frames.len();
404 self.exception_handlers
405 .retain(|h| h.frame_depth <= current_depth);
406 step_runtime::pop_and_record(
407 current_depth + 1,
408 "skipped",
409 Some(step_runtime_error_message(&error)),
410 );
411 if self.frames.is_empty() {
412 return StepBoundaryOutcome::Returned(VmValue::Nil);
413 }
414 self.iterators.truncate(popped.saved_iterator_depth);
415 self.env = popped.saved_env;
416 self.stack.truncate(popped.stack_base);
417 }
418 StepBoundaryOutcome::Returned(VmValue::Nil)
419 }
420 step_runtime::StepErrorBoundary::Escalate => {
421 let identity = step_runtime::with_active_step(|step| {
422 (
423 step.definition.name.clone(),
424 step.definition.function.clone(),
425 )
426 });
427 step_runtime::pop_and_record(
428 step_depth,
429 "escalated",
430 Some(step_runtime_error_message(&error)),
431 );
432 let (step_name, function) = identity.unzip();
433 StepBoundaryOutcome::Throw(step_runtime::mark_escalated(
434 error,
435 step_name.as_deref(),
436 function.as_deref(),
437 ))
438 }
439 step_runtime::StepErrorBoundary::Fail => {
440 step_runtime::pop_and_record(
441 step_depth,
442 "failed",
443 Some(step_runtime_error_message(&error)),
444 );
445 StepBoundaryOutcome::Throw(error)
446 }
447 }
448 }
449}
450
451fn next_deadline(
452 scope_deadline: Option<Instant>,
453 interrupt_handler_deadline: Option<Instant>,
454) -> (Option<Instant>, Option<DeadlineKind>) {
455 match (scope_deadline, interrupt_handler_deadline) {
456 (Some(scope), Some(interrupt)) if interrupt < scope => {
457 (Some(interrupt), Some(DeadlineKind::InterruptHandler))
458 }
459 (Some(scope), _) => (Some(scope), Some(DeadlineKind::Scope)),
460 (None, Some(interrupt)) => (Some(interrupt), Some(DeadlineKind::InterruptHandler)),
461 (None, None) => (None, None),
462 }
463}
464
465fn step_runtime_error_message(error: &VmError) -> String {
466 match error {
467 VmError::Thrown(VmValue::Dict(dict)) => dict
468 .get("message")
469 .map(|v| v.display())
470 .unwrap_or_else(|| error.to_string()),
471 _ => error.to_string(),
472 }
473}
474
475pub(crate) enum StepBoundaryOutcome {
476 Returned(VmValue),
477 Throw(VmError),
478}
479
480impl crate::vm::Vm {
481 pub(crate) async fn execute_one_cycle(&mut self) -> Result<Option<(VmValue, bool)>, VmError> {
482 if let Some(err) = self.pending_scope_interrupt().await {
483 match self.handle_error(err) {
484 Ok(None) => return Ok(None),
485 Ok(Some(val)) => return Ok(Some((val, false))),
486 Err(e) => return Err(e),
487 }
488 }
489
490 let frame = match self.frames.last_mut() {
491 Some(f) => f,
492 None => {
493 let val = self.stack.pop().unwrap_or(VmValue::Nil);
494 return Ok(Some((val, false)));
495 }
496 };
497
498 if frame.ip >= frame.chunk.code.len() {
499 let val = self.stack.pop().unwrap_or(VmValue::Nil);
500 self.release_sync_guards_for_frame(self.frames.len());
501 let popped_frame = self.frames.pop().unwrap();
502 if self.frames.is_empty() {
503 return Ok(Some((val, false)));
504 } else {
505 self.iterators.truncate(popped_frame.saved_iterator_depth);
506 self.env = popped_frame.saved_env;
507 self.stack.truncate(popped_frame.stack_base);
508 self.stack.push(val);
509 return Ok(None);
510 }
511 }
512
513 let op = frame.chunk.code[frame.ip];
514 frame.ip += 1;
515
516 match self.execute_op_with_scope_interrupts(op).await {
517 Ok(Some(val)) => Ok(Some((val, false))),
518 Ok(None) => Ok(None),
519 Err(VmError::Return(val)) => {
520 if let Some(popped_frame) = self.frames.pop() {
521 self.release_sync_guards_for_frame(self.frames.len() + 1);
522 if let Some(ref dir) = popped_frame.saved_source_dir {
523 crate::stdlib::set_thread_source_dir(dir);
524 }
525 let current_depth = self.frames.len();
526 self.exception_handlers
527 .retain(|h| h.frame_depth <= current_depth);
528 if self.frames.is_empty() {
529 return Ok(Some((val, false)));
530 }
531 self.iterators.truncate(popped_frame.saved_iterator_depth);
532 self.env = popped_frame.saved_env;
533 self.stack.truncate(popped_frame.stack_base);
534 self.stack.push(val);
535 Ok(None)
536 } else {
537 Ok(Some((val, false)))
538 }
539 }
540 Err(e) => {
541 if self.error_stack_trace.is_empty() {
542 self.error_stack_trace = self.capture_stack_trace();
543 }
544 match self.handle_error(e) {
545 Ok(None) => {
546 self.error_stack_trace.clear();
547 Ok(None)
548 }
549 Ok(Some(val)) => Ok(Some((val, false))),
550 Err(e) => Err(self.enrich_error_with_line(e)),
551 }
552 }
553 }
554 }
555
556 async fn execute_op_with_scope_interrupts(
557 &mut self,
558 op: u8,
559 ) -> Result<Option<VmValue>, VmError> {
560 enum ScopeInterruptResult {
561 Op(Result<Option<VmValue>, VmError>),
562 Deadline(DeadlineKind),
563 CancelTimedOut,
564 }
565
566 let (deadline, deadline_kind) = next_deadline(
567 self.deadlines.last().map(|(deadline, _)| *deadline),
568 self.interrupt_handler_deadline,
569 );
570 let cancel_token = self.cancel_token.clone();
571
572 if deadline.is_none() && cancel_token.is_none() {
573 return self.execute_op(op).await;
574 }
575
576 let has_deadline = deadline.is_some();
577 let cancel_requested_at_start = cancel_token
578 .as_ref()
579 .is_some_and(|token| token.load(std::sync::atomic::Ordering::SeqCst));
580 let has_cancel = cancel_token.is_some() && !cancel_requested_at_start;
581 let deadline_sleep = async move {
582 if let Some(deadline) = deadline {
583 tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
584 } else {
585 std::future::pending::<()>().await;
586 }
587 };
588 let cancel_sleep = async move {
589 if let Some(token) = cancel_token {
590 while !token.load(std::sync::atomic::Ordering::SeqCst) {
591 tokio::time::sleep(Duration::from_millis(10)).await;
592 }
593 } else {
594 std::future::pending::<()>().await;
595 }
596 };
597
598 let result = {
599 let op_future = self.execute_op(op);
600 tokio::pin!(op_future);
601 tokio::select! {
602 result = &mut op_future => ScopeInterruptResult::Op(result),
603 _ = deadline_sleep, if has_deadline => {
604 ScopeInterruptResult::Deadline(deadline_kind.unwrap_or(DeadlineKind::Scope))
605 },
606 _ = cancel_sleep, if has_cancel => {
607 let grace = tokio::time::sleep(CANCEL_GRACE_ASYNC_OP);
608 tokio::pin!(grace);
609 tokio::select! {
610 result = &mut op_future => ScopeInterruptResult::Op(result),
611 _ = &mut grace => ScopeInterruptResult::CancelTimedOut,
612 }
613 }
614 }
615 };
616
617 match result {
618 ScopeInterruptResult::Op(result) => result,
619 ScopeInterruptResult::Deadline(DeadlineKind::Scope) => {
620 self.deadlines.pop();
621 self.cancel_spawned_tasks();
622 Err(Self::deadline_exceeded_error())
623 }
624 ScopeInterruptResult::Deadline(DeadlineKind::InterruptHandler) => {
625 Err(Self::interrupt_handler_timeout_error())
626 }
627 ScopeInterruptResult::CancelTimedOut => {
628 self.cancel_spawned_tasks();
629 let signal = self
630 .take_host_interrupt_signal()
631 .unwrap_or_else(|| "SIGINT".to_string());
632 if self.has_interrupt_handler_for(&signal) {
633 self.dispatch_interrupt_handlers(&signal).await?;
634 }
635 Err(Self::cancelled_error())
636 }
637 }
638 }
639
640 pub(crate) fn deadline_exceeded_error() -> VmError {
641 VmError::Thrown(VmValue::String(Rc::from("Deadline exceeded")))
642 }
643
644 pub(crate) fn cancelled_error() -> VmError {
645 VmError::Thrown(VmValue::String(Rc::from(
646 "kind:cancelled:VM cancelled by host",
647 )))
648 }
649
650 pub(crate) fn capture_stack_trace(&self) -> Vec<(String, usize, usize, Option<String>)> {
652 self.frames
653 .iter()
654 .map(|f| {
655 let idx = if f.ip > 0 { f.ip - 1 } else { 0 };
656 let line = f.chunk.lines.get(idx).copied().unwrap_or(0) as usize;
657 let col = f.chunk.columns.get(idx).copied().unwrap_or(0) as usize;
658 (f.fn_name.clone(), line, col, f.chunk.source_file.clone())
659 })
660 .collect()
661 }
662
663 pub(crate) fn enrich_error_with_line(&self, error: VmError) -> VmError {
667 let line = self
669 .error_stack_trace
670 .last()
671 .map(|(_, l, _, _)| *l)
672 .unwrap_or_else(|| self.current_line());
673 if line == 0 {
674 return error;
675 }
676 let suffix = format!(" (line {line})");
677 match error {
678 VmError::Runtime(msg) => VmError::Runtime(format!("{msg}{suffix}")),
679 VmError::TypeError(msg) => VmError::TypeError(format!("{msg}{suffix}")),
680 VmError::DivisionByZero => VmError::Runtime(format!("Division by zero{suffix}")),
681 VmError::UndefinedVariable(name) => {
682 VmError::Runtime(format!("Undefined variable: {name}{suffix}"))
683 }
684 VmError::UndefinedBuiltin(name) => {
685 VmError::Runtime(format!("Undefined builtin: {name}{suffix}"))
686 }
687 VmError::ImmutableAssignment(name) => VmError::Runtime(format!(
688 "Cannot assign to immutable binding: {name}{suffix}"
689 )),
690 VmError::StackOverflow => {
691 VmError::Runtime(format!("Stack overflow: too many nested calls{suffix}"))
692 }
693 other => other,
699 }
700 }
701}