Skip to main content

shape_vm/executor/async_ops/
mod.rs

1//! Async operations for the VM executor.
2//!
3//! # Concurrency Model
4//!
5//! The Shape VM uses **cooperative, single-threaded concurrency**. All async
6//! operations execute on the thread that owns the `VirtualMachine` instance --
7//! there is no work-stealing or multi-threaded task execution within the VM
8//! itself. The VM is `!Sync` by design.
9//!
10//! ## Task Lifecycle
11//!
12//! 1. **Spawn** (`SpawnTask`): Pops a callable from the stack, assigns a
13//!    monotonic future ID, registers it with the `TaskScheduler`, and pushes
14//!    a `Future(id)` value onto the stack.
15//! 2. **Await** (`Await`): Pops a `Future(id)`, attempts synchronous inline
16//!    resolution via the `TaskScheduler`. If the task cannot be resolved
17//!    (e.g., it depends on an external I/O operation), execution suspends
18//!    with `VMError::Suspended` so the host runtime can schedule it.
19//! 3. **Join** (`JoinInit` + `JoinAwait`): Collects multiple futures into a
20//!    `TaskGroup` value, then resolves them according to a join strategy
21//!    (all, race, any, all-settled).
22//! 4. **Cancel** (`CancelTask`): Marks a task as cancelled in the scheduler.
23//!
24//! ## Structured Concurrency
25//!
26//! `AsyncScopeEnter` / `AsyncScopeExit` bracket a structured concurrency
27//! region. All tasks spawned within a scope are tracked; on scope exit, any
28//! still-pending tasks are cancelled in LIFO order. This guarantees that no
29//! task outlives its enclosing scope.
30//!
31//! ## Suspension Protocol
32//!
33//! When an operation cannot complete synchronously, it returns
34//! `AsyncExecutionResult::Suspended(SuspensionInfo)`. The dispatch layer in
35//! `dispatch.rs` converts this into `VMError::Suspended { future_id, resume_ip }`
36//! which propagates up to the host runtime. The host resolves the future and
37//! calls back into the VM to resume execution at `resume_ip`.
38//!
39//! ## Opcodes Handled
40//!
41//! `Yield`, `Suspend`, `Resume`, `Poll`, `AwaitBar`, `AwaitTick`,
42//! `EmitAlert`, `EmitEvent`, `Await`, `SpawnTask`, `JoinInit`, `JoinAwait`,
43//! `CancelTask`, `AsyncScopeEnter`, `AsyncScopeExit`.
44
45use crate::{
46    bytecode::{Instruction, OpCode, Operand},
47    executor::VirtualMachine,
48};
49use shape_value::heap_value::HeapValue;
50use shape_value::{VMError, ValueWord};
51
52/// Result of executing an async operation
53#[derive(Debug, Clone)]
54pub enum AsyncExecutionResult {
55    /// Continue normal execution
56    Continue,
57    /// Yield to event loop (cooperative scheduling)
58    Yielded,
59    /// Suspended waiting for external event
60    Suspended(SuspensionInfo),
61}
62
63/// Information about why execution was suspended
64#[derive(Debug, Clone)]
65pub struct SuspensionInfo {
66    /// What we're waiting for
67    pub wait_type: WaitType,
68    /// Instruction pointer to resume at
69    pub resume_ip: usize,
70}
71
72/// Type of wait condition
73#[derive(Debug, Clone)]
74pub enum WaitType {
75    /// Waiting for next data bar from source
76    NextBar { source: String },
77    /// Waiting for timer
78    Timer { id: u64 },
79    /// Waiting for any event
80    AnyEvent,
81    /// Waiting for a future to resolve (general-purpose await)
82    Future { id: u64 },
83    /// Waiting for a task group to resolve (join await)
84    TaskGroup { kind: u8, task_ids: Vec<u64> },
85}
86
87impl VirtualMachine {
88    /// Execute an async opcode
89    ///
90    /// Returns `AsyncExecutionResult` to indicate whether execution should
91    /// continue, yield, or suspend.
92    #[inline(always)]
93    pub(in crate::executor) fn exec_async_op(
94        &mut self,
95        instruction: &Instruction,
96    ) -> Result<AsyncExecutionResult, VMError> {
97        use OpCode::*;
98        match instruction.opcode {
99            Yield => self.op_yield(),
100            Suspend => self.op_suspend(instruction),
101            Resume => self.op_resume(instruction),
102            Poll => self.op_poll(),
103            AwaitBar => self.op_await_bar(instruction),
104            AwaitTick => self.op_await_tick(instruction),
105            EmitAlert => self.op_emit_alert(),
106            EmitEvent => self.op_emit_event(),
107            Await => self.op_await(),
108            SpawnTask => self.op_spawn_task(),
109            JoinInit => self.op_join_init(instruction),
110            JoinAwait => self.op_join_await(),
111            CancelTask => self.op_cancel_task(),
112            AsyncScopeEnter => self.op_async_scope_enter(),
113            AsyncScopeExit => self.op_async_scope_exit(),
114            _ => unreachable!(
115                "exec_async_op called with non-async opcode: {:?}",
116                instruction.opcode
117            ),
118        }
119    }
120
121    /// Yield to the event loop for cooperative scheduling
122    ///
123    /// This allows other tasks to run and prevents long-running
124    /// computations from blocking the event loop.
125    fn op_yield(&mut self) -> Result<AsyncExecutionResult, VMError> {
126        // Save current state - the IP is already pointing to next instruction
127        Ok(AsyncExecutionResult::Yielded)
128    }
129
130    /// Suspend execution until a condition is met
131    ///
132    /// The operand specifies the wait condition type.
133    fn op_suspend(&mut self, instruction: &Instruction) -> Result<AsyncExecutionResult, VMError> {
134        let wait_type = match &instruction.operand {
135            Some(Operand::Const(idx)) => {
136                // Get wait type from constant pool
137                // For now, default to waiting for any event
138                let _ = idx;
139                WaitType::AnyEvent
140            }
141            _ => WaitType::AnyEvent,
142        };
143
144        Ok(AsyncExecutionResult::Suspended(SuspensionInfo {
145            wait_type,
146            resume_ip: self.ip,
147        }))
148    }
149
150    /// Resume from suspension
151    ///
152    /// Called by the runtime when resuming suspended execution.
153    /// The resume value (if any) should be on the stack.
154    fn op_resume(&mut self, _instruction: &Instruction) -> Result<AsyncExecutionResult, VMError> {
155        // Resume is handled by the outer execution loop
156        // This opcode is a marker for where to resume
157        Ok(AsyncExecutionResult::Continue)
158    }
159
160    /// Poll the event queue
161    ///
162    /// Pushes the next event from the queue onto the stack,
163    /// or null if the queue is empty.
164    fn op_poll(&mut self) -> Result<AsyncExecutionResult, VMError> {
165        // In the VM, we don't have direct access to the event queue
166        // This is handled via the VMContext passed from the runtime
167        // For now, push None to indicate no event
168        self.push_vw(ValueWord::none()).map_err(|e| e)?;
169        Ok(AsyncExecutionResult::Continue)
170    }
171
172    /// Await next data bar from a source
173    ///
174    /// Suspends execution until the next data point arrives
175    /// from the specified source.
176    fn op_await_bar(&mut self, instruction: &Instruction) -> Result<AsyncExecutionResult, VMError> {
177        let source = match &instruction.operand {
178            Some(Operand::Const(idx)) => {
179                // Get source name from constant pool
180                match self.program.constants.get(*idx as usize) {
181                    Some(crate::bytecode::Constant::String(s)) => s.clone(),
182                    _ => "default".to_string(),
183                }
184            }
185            _ => "default".to_string(),
186        };
187
188        Ok(AsyncExecutionResult::Suspended(SuspensionInfo {
189            wait_type: WaitType::NextBar { source },
190            resume_ip: self.ip,
191        }))
192    }
193
194    /// Await next timer tick
195    ///
196    /// Suspends execution until the specified timer fires.
197    fn op_await_tick(
198        &mut self,
199        instruction: &Instruction,
200    ) -> Result<AsyncExecutionResult, VMError> {
201        let timer_id = match &instruction.operand {
202            Some(Operand::Const(idx)) => {
203                // Get timer ID from constant pool
204                match self.program.constants.get(*idx as usize) {
205                    Some(crate::bytecode::Constant::Number(n)) => *n as u64,
206                    _ => 0,
207                }
208            }
209            _ => 0,
210        };
211
212        Ok(AsyncExecutionResult::Suspended(SuspensionInfo {
213            wait_type: WaitType::Timer { id: timer_id },
214            resume_ip: self.ip,
215        }))
216    }
217
218    /// Emit an alert to the alert pipeline
219    ///
220    /// Pops an alert object from the stack and sends it to
221    /// the alert router for processing.
222    fn op_emit_alert(&mut self) -> Result<AsyncExecutionResult, VMError> {
223        let _alert_nb = self.pop_vw()?;
224        // Alert pipeline integration pending — consume and continue
225        Ok(AsyncExecutionResult::Continue)
226    }
227
228    /// General-purpose await
229    ///
230    /// Pops a value from the stack. If it's a Future(id), attempts to resolve
231    /// the task inline from the task scheduler. If the task's callable is a
232    /// plain value (not a closure/function), it is used directly as the result.
233    /// Otherwise, suspends execution so the host runtime can schedule the task.
234    /// If the value is not a Future, pushes it back (sync shortcut).
235    fn op_await(&mut self) -> Result<AsyncExecutionResult, VMError> {
236        let sp_before = self.sp;
237        let nb = self.pop_vw()?;
238        match nb.as_heap_ref() {
239            Some(HeapValue::Future(id)) => {
240                let id = *id;
241
242                // Try to resolve the task inline from the task scheduler.
243                // For `async let x = expr`, the callable stored by SpawnTask
244                // is the already-evaluated value of `expr`. We resolve it
245                // directly without suspending.
246                let resolved = self.task_scheduler.resolve_task(id, |callable| {
247                    // The callable is the value that was on the stack when
248                    // SpawnTask executed. For simple expressions it's already
249                    // the result value.
250                    Ok(callable)
251                });
252
253                match resolved {
254                    Ok(value) => {
255                        self.push_vw(value)?;
256                        // Await consumes a Future and pushes a result: net stack effect is 0.
257                        debug_assert_eq!(
258                            self.sp, sp_before,
259                            "op_await: stack depth changed (before={}, after={})",
260                            sp_before, self.sp
261                        );
262                        Ok(AsyncExecutionResult::Continue)
263                    }
264                    Err(_) => {
265                        // Could not resolve inline — suspend for host runtime
266                        Ok(AsyncExecutionResult::Suspended(SuspensionInfo {
267                            wait_type: WaitType::Future { id },
268                            resume_ip: self.ip,
269                        }))
270                    }
271                }
272            }
273            _ => {
274                // Sync shortcut: value is already resolved, push it back
275                self.push_vw(nb)?;
276                debug_assert_eq!(
277                    self.sp, sp_before,
278                    "op_await (sync shortcut): stack depth changed (before={}, after={})",
279                    sp_before, self.sp
280                );
281                Ok(AsyncExecutionResult::Continue)
282            }
283        }
284    }
285
286    /// Await with a timeout.
287    ///
288    /// Spawn a task from a closure/function on the stack
289    ///
290    /// Pops a closure or function reference from the stack and creates a new async task.
291    /// Pushes a Future(task_id) onto the stack representing the spawned task.
292    /// The host runtime is responsible for actually scheduling the task.
293    ///
294    /// If inside an async scope, the spawned future ID is tracked for cancellation.
295    fn op_spawn_task(&mut self) -> Result<AsyncExecutionResult, VMError> {
296        let sp_before = self.sp;
297        let callable_nb = self.pop_vw()?;
298
299        let task_id = self.next_future_id();
300        self.task_scheduler.register(task_id, callable_nb);
301
302        if let Some(scope) = self.async_scope_stack.last_mut() {
303            scope.push(task_id);
304        }
305
306        self.push_vw(ValueWord::from_future(task_id))?;
307        // SpawnTask replaces a callable with a Future: net stack effect is 0.
308        debug_assert_eq!(
309            self.sp, sp_before,
310            "op_spawn_task: stack depth changed (before={}, after={})",
311            sp_before, self.sp
312        );
313        Ok(AsyncExecutionResult::Continue)
314    }
315
316    /// Initialize a join group from futures on the stack
317    ///
318    /// Operand: Count(packed_u16) where high 2 bits = join kind, low 14 bits = arity.
319    /// Pops `arity` Future values from the stack (in reverse order).
320    /// Pushes a ValueWord::TaskGroup with the collected future IDs.
321    fn op_join_init(&mut self, instruction: &Instruction) -> Result<AsyncExecutionResult, VMError> {
322        let packed = match &instruction.operand {
323            Some(Operand::Count(n)) => *n,
324            _ => {
325                return Err(VMError::RuntimeError(
326                    "JoinInit requires Count operand".to_string(),
327                ));
328            }
329        };
330
331        let kind = ((packed >> 14) & 0x03) as u8;
332        let arity = (packed & 0x3FFF) as usize;
333
334        if self.sp < arity {
335            return Err(VMError::StackUnderflow);
336        }
337
338        let mut task_ids = Vec::with_capacity(arity);
339        for _ in 0..arity {
340            let nb = self.pop_vw()?;
341            match nb.as_heap_ref() {
342                Some(HeapValue::Future(id)) => task_ids.push(*id),
343                _ => {
344                    return Err(VMError::RuntimeError(format!(
345                        "JoinInit expected Future, got {}",
346                        nb.type_name()
347                    )));
348                }
349            }
350        }
351        // Reverse so task_ids[0] corresponds to first branch
352        task_ids.reverse();
353
354        self.push_vw(ValueWord::from_heap_value(
355            shape_value::heap_value::HeapValue::TaskGroup { kind, task_ids },
356        ))?;
357        Ok(AsyncExecutionResult::Continue)
358    }
359
360    /// Await a task group, resolving tasks inline
361    ///
362    /// Pops a ValueWord::TaskGroup from the stack.
363    /// Resolves all tasks inline using the task scheduler's `resolve_task_group`,
364    /// which executes each task's callable synchronously (same strategy as `op_await`).
365    /// Pushes the result value onto the stack according to the join strategy.
366    fn op_join_await(&mut self) -> Result<AsyncExecutionResult, VMError> {
367        let sp_before = self.sp;
368        let nb = self.pop_vw()?;
369        match nb.as_heap_ref() {
370            Some(HeapValue::TaskGroup { kind, task_ids }) => {
371                let kind = *kind;
372                let task_ids = task_ids.clone();
373
374                let result = self
375                    .task_scheduler
376                    .resolve_task_group(kind, &task_ids, |callable| Ok(callable));
377
378                match result {
379                    Ok(value) => {
380                        self.push_vw(value)?;
381                        // JoinAwait consumes a TaskGroup and pushes a result: net effect is 0.
382                        debug_assert_eq!(
383                            self.sp, sp_before,
384                            "op_join_await: stack depth changed (before={}, after={})",
385                            sp_before, self.sp
386                        );
387                        Ok(AsyncExecutionResult::Continue)
388                    }
389                    Err(_) => {
390                        // Could not resolve inline — suspend for host runtime
391                        Ok(AsyncExecutionResult::Suspended(SuspensionInfo {
392                            wait_type: WaitType::TaskGroup { kind, task_ids },
393                            resume_ip: self.ip,
394                        }))
395                    }
396                }
397            }
398            _ => Err(VMError::RuntimeError(format!(
399                "JoinAwait expected TaskGroup, got {}",
400                nb.type_name()
401            ))),
402        }
403    }
404
405    /// Cancel a task by its future ID
406    ///
407    /// Pops a Future(task_id) from the stack and signals cancellation.
408    /// The host runtime is responsible for actually cancelling the task.
409    fn op_cancel_task(&mut self) -> Result<AsyncExecutionResult, VMError> {
410        let nb = self.pop_vw()?;
411        match nb.as_heap_ref() {
412            Some(HeapValue::Future(id)) => {
413                self.task_scheduler.cancel(*id);
414                Ok(AsyncExecutionResult::Continue)
415            }
416            _ => Err(VMError::RuntimeError(format!(
417                "CancelTask expected Future, got {}",
418                nb.type_name()
419            ))),
420        }
421    }
422
423    /// Enter a structured concurrency scope
424    ///
425    /// Pushes a new empty Vec onto the async_scope_stack.
426    /// All tasks spawned while this scope is active are tracked in that Vec.
427    fn op_async_scope_enter(&mut self) -> Result<AsyncExecutionResult, VMError> {
428        let depth_before = self.async_scope_stack.len();
429        self.async_scope_stack.push(Vec::new());
430        debug_assert_eq!(
431            self.async_scope_stack.len(),
432            depth_before + 1,
433            "op_async_scope_enter: scope stack depth not incremented"
434        );
435        Ok(AsyncExecutionResult::Continue)
436    }
437
438    /// Exit a structured concurrency scope
439    ///
440    /// Pops the current scope from the async_scope_stack and cancels
441    /// all tasks spawned within it that are still pending, in LIFO order.
442    /// The body's result value remains on top of the stack.
443    fn op_async_scope_exit(&mut self) -> Result<AsyncExecutionResult, VMError> {
444        debug_assert!(
445            !self.async_scope_stack.is_empty(),
446            "op_async_scope_exit: scope stack is empty (mismatched Enter/Exit)"
447        );
448        if let Some(mut scope_tasks) = self.async_scope_stack.pop() {
449            // Cancel in LIFO order (last spawned first)
450            scope_tasks.reverse();
451            for task_id in scope_tasks {
452                self.task_scheduler.cancel(task_id);
453            }
454        }
455        // Result value from the body is already on top of the stack
456        Ok(AsyncExecutionResult::Continue)
457    }
458
459    /// Emit a generic event to the event queue
460    ///
461    /// Pops an event object from the stack and pushes it to
462    /// the event queue for external consumers.
463    fn op_emit_event(&mut self) -> Result<AsyncExecutionResult, VMError> {
464        let _event_nb = self.pop_vw()?;
465        // Event queue integration pending — consume and continue
466        Ok(AsyncExecutionResult::Continue)
467    }
468}
469
470/// Check if an opcode is an async operation
471#[cfg(test)]
472pub fn is_async_opcode(opcode: OpCode) -> bool {
473    matches!(
474        opcode,
475        OpCode::Yield
476            | OpCode::Suspend
477            | OpCode::Resume
478            | OpCode::Poll
479            | OpCode::AwaitBar
480            | OpCode::AwaitTick
481            | OpCode::EmitAlert
482            | OpCode::EmitEvent
483            | OpCode::Await
484            | OpCode::SpawnTask
485            | OpCode::JoinInit
486            | OpCode::JoinAwait
487            | OpCode::CancelTask
488            | OpCode::AsyncScopeEnter
489            | OpCode::AsyncScopeExit
490    )
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    #[test]
498    fn test_is_async_opcode() {
499        assert!(is_async_opcode(OpCode::Yield));
500        assert!(is_async_opcode(OpCode::Suspend));
501        assert!(is_async_opcode(OpCode::EmitAlert));
502        assert!(is_async_opcode(OpCode::AsyncScopeEnter));
503        assert!(is_async_opcode(OpCode::AsyncScopeExit));
504        assert!(!is_async_opcode(OpCode::Add));
505        assert!(!is_async_opcode(OpCode::Jump));
506    }
507
508    #[test]
509    fn test_is_async_opcode_all_variants() {
510        // Test all async opcodes
511        assert!(is_async_opcode(OpCode::Yield));
512        assert!(is_async_opcode(OpCode::Suspend));
513        assert!(is_async_opcode(OpCode::Resume));
514        assert!(is_async_opcode(OpCode::Poll));
515        assert!(is_async_opcode(OpCode::AwaitBar));
516        assert!(is_async_opcode(OpCode::AwaitTick));
517        assert!(is_async_opcode(OpCode::EmitAlert));
518        assert!(is_async_opcode(OpCode::EmitEvent));
519
520        // Test non-async opcodes
521        assert!(!is_async_opcode(OpCode::PushConst));
522        assert!(!is_async_opcode(OpCode::Return));
523        assert!(!is_async_opcode(OpCode::Call));
524        assert!(!is_async_opcode(OpCode::Nop));
525    }
526
527    #[test]
528    fn test_async_execution_result_variants() {
529        // Test Continue
530        let continue_result = AsyncExecutionResult::Continue;
531        assert!(matches!(continue_result, AsyncExecutionResult::Continue));
532
533        // Test Yielded
534        let yielded_result = AsyncExecutionResult::Yielded;
535        assert!(matches!(yielded_result, AsyncExecutionResult::Yielded));
536
537        // Test Suspended
538        let suspended_result = AsyncExecutionResult::Suspended(SuspensionInfo {
539            wait_type: WaitType::AnyEvent,
540            resume_ip: 42,
541        });
542        match suspended_result {
543            AsyncExecutionResult::Suspended(info) => {
544                assert_eq!(info.resume_ip, 42);
545                assert!(matches!(info.wait_type, WaitType::AnyEvent));
546            }
547            _ => panic!("Expected Suspended"),
548        }
549    }
550
551    #[test]
552    fn test_wait_type_variants() {
553        // NextBar
554        let next_bar = WaitType::NextBar {
555            source: "market_data".to_string(),
556        };
557        match next_bar {
558            WaitType::NextBar { source } => assert_eq!(source, "market_data"),
559            _ => panic!("Expected NextBar"),
560        }
561
562        // Timer
563        let timer = WaitType::Timer { id: 123 };
564        match timer {
565            WaitType::Timer { id } => assert_eq!(id, 123),
566            _ => panic!("Expected Timer"),
567        }
568
569        // AnyEvent
570        let any = WaitType::AnyEvent;
571        assert!(matches!(any, WaitType::AnyEvent));
572    }
573
574    #[test]
575    fn test_suspension_info_creation() {
576        let info = SuspensionInfo {
577            wait_type: WaitType::Timer { id: 999 },
578            resume_ip: 100,
579        };
580
581        assert_eq!(info.resume_ip, 100);
582        assert!(matches!(info.wait_type, WaitType::Timer { id: 999 }));
583    }
584
585    #[test]
586    fn test_is_async_opcode_await() {
587        assert!(is_async_opcode(OpCode::Await));
588    }
589
590    #[test]
591    fn test_wait_type_future() {
592        let future = WaitType::Future { id: 42 };
593        match future {
594            WaitType::Future { id } => assert_eq!(id, 42),
595            _ => panic!("Expected Future"),
596        }
597    }
598
599    #[test]
600    fn test_is_async_opcode_join_opcodes() {
601        assert!(is_async_opcode(OpCode::SpawnTask));
602        assert!(is_async_opcode(OpCode::JoinInit));
603        assert!(is_async_opcode(OpCode::JoinAwait));
604        assert!(is_async_opcode(OpCode::CancelTask));
605    }
606
607    #[test]
608    fn test_wait_type_task_group() {
609        let tg = WaitType::TaskGroup {
610            kind: 0,
611            task_ids: vec![1, 2, 3],
612        };
613        match tg {
614            WaitType::TaskGroup { kind, task_ids } => {
615                assert_eq!(kind, 0); // All
616                assert_eq!(task_ids.len(), 3);
617                assert_eq!(task_ids, vec![1, 2, 3]);
618            }
619            _ => panic!("Expected TaskGroup"),
620        }
621    }
622
623    #[test]
624    fn test_wait_type_task_group_race() {
625        let tg = WaitType::TaskGroup {
626            kind: 1,
627            task_ids: vec![10, 20],
628        };
629        match tg {
630            WaitType::TaskGroup { kind, task_ids } => {
631                assert_eq!(kind, 1); // Race
632                assert_eq!(task_ids, vec![10, 20]);
633            }
634            _ => panic!("Expected TaskGroup"),
635        }
636    }
637}