Skip to main content

shape_runtime/
event_queue.rs

1//! Platform-agnostic event queue for async operations
2//!
3//! This module provides a generic event queue abstraction that works across:
4//! - Native Tokio runtime
5//! - Bare metal / no_std environments
6//!
7//! The design avoids Tokio-specific async primitives in the core trait,
8//! allowing Shape to run on any platform.
9
10use serde::{Deserialize, Serialize};
11// ADR-006 §2.7: GENERIC_CARRIER vector storage uses `KindedSlot`. Event
12// payloads (`QueuedEvent::DataPoint::data`) and suspension-state buffers
13// (`SuspensionState::saved_locals`/`saved_stack`) hold heterogeneous
14// runtime values whose `NativeKind` isn't statically determined here.
15// Serialization of the slot bytes is deferred (see snapshot.rs comment
16// at line 650) — the data fields stay `#[serde(skip)]` for now.
17use shape_value::KindedSlot;
18use std::sync::Arc;
19
20/// Events that can be queued for processing
21#[derive(Debug, Clone)]
22pub enum QueuedEvent {
23    /// New data point arrived from a data source
24    DataPoint {
25        /// Name of the data source (e.g., "data", "iot_sensors")
26        source: String,
27        /// The data payload
28        data: KindedSlot,
29    },
30
31    /// Timer fired
32    Timer {
33        /// Timer ID for matching with awaiting code
34        id: u64,
35    },
36
37    /// External signal from a plugin or external system
38    External {
39        /// Raw payload bytes (typically MessagePack encoded)
40        payload: Vec<u8>,
41    },
42
43    /// Subscription update (streaming data)
44    Subscription {
45        /// Subscription ID
46        subscription_id: u64,
47        /// Source name
48        source: String,
49        /// Data payload
50        data: KindedSlot,
51    },
52
53    /// Error from a data source or plugin
54    Error {
55        /// Source of the error
56        source: String,
57        /// Error message
58        message: String,
59    },
60
61    /// Shutdown request
62    Shutdown,
63}
64
65/// Platform-agnostic event queue trait
66///
67/// Implementations provide different backing stores:
68/// - `MemoryEventQueue`: Lock-free queue for general use
69/// - `TokioEventQueue`: Integrates with Tokio channels (native only)
70pub trait EventQueue: Send + Sync {
71    /// Poll for the next event (non-blocking)
72    ///
73    /// Returns `None` if the queue is empty.
74    fn poll(&self) -> Option<QueuedEvent>;
75
76    /// Push an event onto the queue
77    fn push(&self, event: QueuedEvent);
78
79    /// Check if the queue is empty
80    fn is_empty(&self) -> bool;
81
82    /// Get the number of pending events
83    fn len(&self) -> usize;
84
85    /// Try to receive multiple events at once (batch poll)
86    ///
87    /// Returns up to `max` events. Default implementation polls repeatedly.
88    fn poll_batch(&self, max: usize) -> Vec<QueuedEvent> {
89        let mut events = Vec::with_capacity(max);
90        while events.len() < max {
91            if let Some(event) = self.poll() {
92                events.push(event);
93            } else {
94                break;
95            }
96        }
97        events
98    }
99}
100
101/// In-memory event queue using crossbeam's lock-free queue
102///
103/// This implementation works everywhere (native, no_std with alloc).
104pub struct MemoryEventQueue {
105    queue: crossbeam_queue::SegQueue<QueuedEvent>,
106}
107
108impl MemoryEventQueue {
109    /// Create a new empty queue
110    pub fn new() -> Self {
111        Self {
112            queue: crossbeam_queue::SegQueue::new(),
113        }
114    }
115}
116
117impl Default for MemoryEventQueue {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl EventQueue for MemoryEventQueue {
124    fn poll(&self) -> Option<QueuedEvent> {
125        self.queue.pop()
126    }
127
128    fn push(&self, event: QueuedEvent) {
129        self.queue.push(event);
130    }
131
132    fn is_empty(&self) -> bool {
133        self.queue.is_empty()
134    }
135
136    fn len(&self) -> usize {
137        self.queue.len()
138    }
139}
140
141/// Tokio-backed event queue for native async integration
142///
143/// Uses unbounded MPSC channels for integration with Tokio's async runtime.
144#[cfg(feature = "tokio-runtime")]
145pub struct TokioEventQueue {
146    sender: tokio::sync::mpsc::UnboundedSender<QueuedEvent>,
147    receiver: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<QueuedEvent>>,
148}
149
150#[cfg(feature = "tokio-runtime")]
151impl TokioEventQueue {
152    /// Create a new Tokio-backed event queue
153    pub fn new() -> Self {
154        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
155        Self {
156            sender,
157            receiver: std::sync::Mutex::new(receiver),
158        }
159    }
160
161    /// Get a sender handle for pushing events from async contexts
162    pub fn sender(&self) -> tokio::sync::mpsc::UnboundedSender<QueuedEvent> {
163        self.sender.clone()
164    }
165
166    /// Async receive - waits for next event
167    pub async fn recv_async(&self) -> Option<QueuedEvent> {
168        // Note: This requires holding the lock across await, which is not ideal
169        // In practice, we'd use a different pattern for true async
170        self.sender.clone();
171        None // Placeholder - actual impl would use proper async patterns
172    }
173}
174
175#[cfg(feature = "tokio-runtime")]
176impl Default for TokioEventQueue {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182#[cfg(feature = "tokio-runtime")]
183impl EventQueue for TokioEventQueue {
184    fn poll(&self) -> Option<QueuedEvent> {
185        if let Ok(mut receiver) = self.receiver.try_lock() {
186            receiver.try_recv().ok()
187        } else {
188            None
189        }
190    }
191
192    fn push(&self, event: QueuedEvent) {
193        let _ = self.sender.send(event);
194    }
195
196    fn is_empty(&self) -> bool {
197        self.sender.is_closed() || self.len() == 0
198    }
199
200    fn len(&self) -> usize {
201        // Unbounded channels don't expose length directly
202        // This is an approximation
203        0
204    }
205}
206
207/// Suspension state for resumable execution
208///
209/// When a Shape program yields or awaits, this state captures
210/// everything needed to resume execution later.
211///
212/// **WB2.5 retain-on-read.** `saved_locals` / `saved_stack` hold
213/// owning shares of heap-tagged values. ADR-006 §2.7 GENERIC_CARRIER
214/// vector storage: `Vec<KindedSlot>` carries the explicit `Drop`/`Clone`
215/// discipline so push/pop/clone preserve refcount semantics. The serde
216/// skip-on-payload pattern matches the snapshot.rs:650 comment — slot
217/// (de)serialization is deferred to the kind-threaded follow-up API.
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct SuspensionState {
220    /// What condition we're waiting for
221    pub waiting_for: WaitCondition,
222    /// Program counter to resume at (for VM/JIT)
223    pub resume_pc: usize,
224    /// Saved local variables
225    #[serde(skip)]
226    #[serde(default)]
227    pub saved_locals: Vec<KindedSlot>,
228    /// Saved stack (for VM)
229    #[serde(skip)]
230    #[serde(default)]
231    pub saved_stack: Vec<KindedSlot>,
232}
233
234impl SuspensionState {
235    /// Create a new suspension state
236    pub fn new(waiting_for: WaitCondition, resume_pc: usize) -> Self {
237        Self {
238            waiting_for,
239            resume_pc,
240            saved_locals: Vec::new(),
241            saved_stack: Vec::new(),
242        }
243    }
244
245    /// Create with saved locals. `KindedSlot::Drop` retires refcounts on
246    /// drop; `KindedSlot::Clone` retains them on copy.
247    pub fn with_locals(mut self, locals: Vec<KindedSlot>) -> Self {
248        self.saved_locals = locals;
249        self
250    }
251
252    /// Create with saved stack
253    pub fn with_stack(mut self, stack: Vec<KindedSlot>) -> Self {
254        self.saved_stack = stack;
255        self
256    }
257}
258
259/// Condition that caused suspension
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub enum WaitCondition {
262    /// Waiting for next data bar from a source
263    NextBar {
264        /// Data source name
265        source: String,
266    },
267
268    /// Waiting for a timer to fire
269    Timer {
270        /// Timer ID
271        id: u64,
272        /// Deadline (milliseconds since epoch)
273        deadline_ms: u64,
274    },
275
276    /// Waiting for an external event
277    External {
278        /// Event type filter
279        event_type: String,
280    },
281
282    /// Waiting for any event from the queue
283    AnyEvent,
284
285    /// Yielded for cooperative scheduling (no specific wait)
286    Yield,
287    /// Explicit snapshot suspension
288    Snapshot,
289    /// Waiting for a future to resolve
290    Future { id: u64 },
291}
292
293/// Shared event queue type alias
294pub type SharedEventQueue = Arc<dyn EventQueue>;
295
296/// Create a default memory-based event queue
297pub fn create_event_queue() -> SharedEventQueue {
298    Arc::new(MemoryEventQueue::new())
299}
300
301/// Create a Tokio-backed event queue (when feature enabled)
302#[cfg(feature = "tokio-runtime")]
303pub fn create_tokio_event_queue() -> SharedEventQueue {
304    Arc::new(TokioEventQueue::new())
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    #[allow(unused_imports)]
311    use std::sync::Arc;
312
313    #[test]
314    fn test_memory_event_queue_basic() {
315        let queue = MemoryEventQueue::new();
316
317        assert!(queue.is_empty());
318        assert_eq!(queue.len(), 0);
319
320        queue.push(QueuedEvent::Timer { id: 1 });
321        assert!(!queue.is_empty());
322        assert_eq!(queue.len(), 1);
323
324        let event = queue.poll();
325        assert!(matches!(event, Some(QueuedEvent::Timer { id: 1 })));
326        assert!(queue.is_empty());
327    }
328
329    #[test]
330    fn test_memory_event_queue_fifo() {
331        let queue = MemoryEventQueue::new();
332
333        queue.push(QueuedEvent::Timer { id: 1 });
334        queue.push(QueuedEvent::Timer { id: 2 });
335        queue.push(QueuedEvent::Timer { id: 3 });
336
337        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
338        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 2 })));
339        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 3 })));
340        assert!(queue.poll().is_none());
341    }
342
343    #[test]
344    fn test_poll_batch() {
345        let queue = MemoryEventQueue::new();
346
347        for i in 0..5 {
348            queue.push(QueuedEvent::Timer { id: i });
349        }
350
351        let batch = queue.poll_batch(3);
352        assert_eq!(batch.len(), 3);
353        assert_eq!(queue.len(), 2);
354
355        let remaining = queue.poll_batch(10);
356        assert_eq!(remaining.len(), 2);
357        assert!(queue.is_empty());
358    }
359
360    #[test]
361    fn test_suspension_state() {
362        let state = SuspensionState::new(
363            WaitCondition::NextBar {
364                source: "data".to_string(),
365            },
366            42,
367        )
368        .with_locals(vec![KindedSlot::from_number(1.0), KindedSlot::from_number(2.0)]);
369
370        assert_eq!(state.resume_pc, 42);
371        assert_eq!(state.saved_locals.len(), 2);
372        assert!(matches!(
373            &state.waiting_for,
374            WaitCondition::NextBar { source } if source == "data"
375        ));
376    }
377
378    #[test]
379    fn test_event_types_data_point() {
380        let queue = MemoryEventQueue::new();
381
382        queue.push(QueuedEvent::DataPoint {
383            source: "iot_sensors".to_string(),
384            data: KindedSlot::from_number(42.5),
385        });
386
387        let event = queue.poll().unwrap();
388        match event {
389            QueuedEvent::DataPoint { source, data } => {
390                assert_eq!(source, "iot_sensors");
391                assert!((data.slot().as_f64() - 42.5).abs() < 0.001);
392            }
393            _ => panic!("Expected DataPoint event"),
394        }
395    }
396
397    #[test]
398    fn test_event_types_external() {
399        let queue = MemoryEventQueue::new();
400
401        queue.push(QueuedEvent::External {
402            payload: vec![1, 2, 3, 4],
403        });
404
405        let event = queue.poll().unwrap();
406        match event {
407            QueuedEvent::External { payload } => {
408                assert_eq!(payload, vec![1, 2, 3, 4]);
409            }
410            _ => panic!("Expected External event"),
411        }
412    }
413
414    #[test]
415    fn test_event_types_subscription() {
416        let queue = MemoryEventQueue::new();
417
418        queue.push(QueuedEvent::Subscription {
419            subscription_id: 123,
420            source: "live_feed".to_string(),
421            data: KindedSlot::from_string_arc(Arc::new("update".to_string())),
422        });
423
424        let event = queue.poll().unwrap();
425        match event {
426            QueuedEvent::Subscription {
427                subscription_id,
428                source,
429                data,
430            } => {
431                assert_eq!(subscription_id, 123);
432                assert_eq!(source, "live_feed");
433                // Read the Arc<String> back via the slot's raw bits.
434                let s_ref: &str = unsafe {
435                    let ptr = data.slot().raw() as *const String;
436                    (*ptr).as_str()
437                };
438                assert_eq!(s_ref, "update");
439            }
440            _ => panic!("Expected Subscription event"),
441        }
442    }
443
444    #[test]
445    fn test_event_types_error() {
446        let queue = MemoryEventQueue::new();
447
448        queue.push(QueuedEvent::Error {
449            source: "database".to_string(),
450            message: "Connection failed".to_string(),
451        });
452
453        let event = queue.poll().unwrap();
454        match event {
455            QueuedEvent::Error { source, message } => {
456                assert_eq!(source, "database");
457                assert_eq!(message, "Connection failed");
458            }
459            _ => panic!("Expected Error event"),
460        }
461    }
462
463    #[test]
464    fn test_event_types_shutdown() {
465        let queue = MemoryEventQueue::new();
466
467        queue.push(QueuedEvent::Shutdown);
468
469        let event = queue.poll().unwrap();
470        assert!(matches!(event, QueuedEvent::Shutdown));
471    }
472
473    #[test]
474    fn test_wait_condition_variants() {
475        // Test all WaitCondition variants
476        let next_bar = WaitCondition::NextBar {
477            source: "src".to_string(),
478        };
479        assert!(matches!(next_bar, WaitCondition::NextBar { .. }));
480
481        let timer = WaitCondition::Timer {
482            id: 1,
483            deadline_ms: 1000,
484        };
485        assert!(matches!(
486            timer,
487            WaitCondition::Timer {
488                id: 1,
489                deadline_ms: 1000
490            }
491        ));
492
493        let external = WaitCondition::External {
494            event_type: "alert".to_string(),
495        };
496        assert!(matches!(external, WaitCondition::External { .. }));
497
498        let any = WaitCondition::AnyEvent;
499        assert!(matches!(any, WaitCondition::AnyEvent));
500
501        let yield_cond = WaitCondition::Yield;
502        assert!(matches!(yield_cond, WaitCondition::Yield));
503    }
504
505    #[test]
506    fn test_create_event_queue_returns_shared() {
507        let queue1 = create_event_queue();
508        let queue2 = queue1.clone();
509
510        // Push via one reference
511        queue1.push(QueuedEvent::Timer { id: 42 });
512
513        // Poll via other reference
514        let event = queue2.poll().unwrap();
515        assert!(matches!(event, QueuedEvent::Timer { id: 42 }));
516    }
517
518    #[test]
519    fn test_suspension_state_with_stack() {
520        let state = SuspensionState::new(WaitCondition::Yield, 100).with_stack(vec![
521            KindedSlot::from_number(1.0),
522            KindedSlot::from_number(2.0),
523            KindedSlot::from_number(3.0),
524        ]);
525
526        assert_eq!(state.resume_pc, 100);
527        assert_eq!(state.saved_stack.len(), 3);
528        assert!(state.saved_locals.is_empty());
529    }
530
531    #[test]
532    fn test_mixed_event_ordering() {
533        let queue = MemoryEventQueue::new();
534
535        // Push different event types
536        queue.push(QueuedEvent::Timer { id: 1 });
537        queue.push(QueuedEvent::Shutdown);
538        queue.push(QueuedEvent::DataPoint {
539            source: "test".to_string(),
540            data: KindedSlot::none(),
541        });
542
543        // Verify FIFO ordering preserved across types
544        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
545        assert!(matches!(queue.poll(), Some(QueuedEvent::Shutdown)));
546        assert!(matches!(queue.poll(), Some(QueuedEvent::DataPoint { .. })));
547        assert!(queue.poll().is_none());
548    }
549}