Skip to main content

shape_runtime/
event_queue.rs

1//! Platform-agnostic event queue for async operations
2//!
3//! This module provides a generic event queue abstraction that works across:
4//! - Native Tokio runtime
5//! - Bare metal / no_std environments
6//!
7//! The design avoids Tokio-specific async primitives in the core trait,
8//! allowing Shape to run on any platform.
9
10use serde::{Deserialize, Serialize};
11use shape_value::ValueWord;
12use std::sync::Arc;
13
14/// Events that can be queued for processing
15#[derive(Debug, Clone)]
16pub enum QueuedEvent {
17    /// New data point arrived from a data source
18    DataPoint {
19        /// Name of the data source (e.g., "data", "iot_sensors")
20        source: String,
21        /// The data payload
22        data: ValueWord,
23    },
24
25    /// Timer fired
26    Timer {
27        /// Timer ID for matching with awaiting code
28        id: u64,
29    },
30
31    /// External signal from a plugin or external system
32    External {
33        /// Raw payload bytes (typically MessagePack encoded)
34        payload: Vec<u8>,
35    },
36
37    /// Subscription update (streaming data)
38    Subscription {
39        /// Subscription ID
40        subscription_id: u64,
41        /// Source name
42        source: String,
43        /// Data payload
44        data: ValueWord,
45    },
46
47    /// Error from a data source or plugin
48    Error {
49        /// Source of the error
50        source: String,
51        /// Error message
52        message: String,
53    },
54
55    /// Shutdown request
56    Shutdown,
57}
58
59/// Platform-agnostic event queue trait
60///
61/// Implementations provide different backing stores:
62/// - `MemoryEventQueue`: Lock-free queue for general use
63/// - `TokioEventQueue`: Integrates with Tokio channels (native only)
64pub trait EventQueue: Send + Sync {
65    /// Poll for the next event (non-blocking)
66    ///
67    /// Returns `None` if the queue is empty.
68    fn poll(&self) -> Option<QueuedEvent>;
69
70    /// Push an event onto the queue
71    fn push(&self, event: QueuedEvent);
72
73    /// Check if the queue is empty
74    fn is_empty(&self) -> bool;
75
76    /// Get the number of pending events
77    fn len(&self) -> usize;
78
79    /// Try to receive multiple events at once (batch poll)
80    ///
81    /// Returns up to `max` events. Default implementation polls repeatedly.
82    fn poll_batch(&self, max: usize) -> Vec<QueuedEvent> {
83        let mut events = Vec::with_capacity(max);
84        while events.len() < max {
85            if let Some(event) = self.poll() {
86                events.push(event);
87            } else {
88                break;
89            }
90        }
91        events
92    }
93}
94
95/// In-memory event queue using crossbeam's lock-free queue
96///
97/// This implementation works everywhere (native, no_std with alloc).
98pub struct MemoryEventQueue {
99    queue: crossbeam_queue::SegQueue<QueuedEvent>,
100}
101
102impl MemoryEventQueue {
103    /// Create a new empty queue
104    pub fn new() -> Self {
105        Self {
106            queue: crossbeam_queue::SegQueue::new(),
107        }
108    }
109}
110
111impl Default for MemoryEventQueue {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117impl EventQueue for MemoryEventQueue {
118    fn poll(&self) -> Option<QueuedEvent> {
119        self.queue.pop()
120    }
121
122    fn push(&self, event: QueuedEvent) {
123        self.queue.push(event);
124    }
125
126    fn is_empty(&self) -> bool {
127        self.queue.is_empty()
128    }
129
130    fn len(&self) -> usize {
131        self.queue.len()
132    }
133}
134
135/// Tokio-backed event queue for native async integration
136///
137/// Uses unbounded MPSC channels for integration with Tokio's async runtime.
138#[cfg(feature = "tokio-runtime")]
139pub struct TokioEventQueue {
140    sender: tokio::sync::mpsc::UnboundedSender<QueuedEvent>,
141    receiver: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<QueuedEvent>>,
142}
143
144#[cfg(feature = "tokio-runtime")]
145impl TokioEventQueue {
146    /// Create a new Tokio-backed event queue
147    pub fn new() -> Self {
148        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
149        Self {
150            sender,
151            receiver: std::sync::Mutex::new(receiver),
152        }
153    }
154
155    /// Get a sender handle for pushing events from async contexts
156    pub fn sender(&self) -> tokio::sync::mpsc::UnboundedSender<QueuedEvent> {
157        self.sender.clone()
158    }
159
160    /// Async receive - waits for next event
161    pub async fn recv_async(&self) -> Option<QueuedEvent> {
162        // Note: This requires holding the lock across await, which is not ideal
163        // In practice, we'd use a different pattern for true async
164        self.sender.clone();
165        None // Placeholder - actual impl would use proper async patterns
166    }
167}
168
169#[cfg(feature = "tokio-runtime")]
170impl Default for TokioEventQueue {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176#[cfg(feature = "tokio-runtime")]
177impl EventQueue for TokioEventQueue {
178    fn poll(&self) -> Option<QueuedEvent> {
179        if let Ok(mut receiver) = self.receiver.try_lock() {
180            receiver.try_recv().ok()
181        } else {
182            None
183        }
184    }
185
186    fn push(&self, event: QueuedEvent) {
187        let _ = self.sender.send(event);
188    }
189
190    fn is_empty(&self) -> bool {
191        self.sender.is_closed() || self.len() == 0
192    }
193
194    fn len(&self) -> usize {
195        // Unbounded channels don't expose length directly
196        // This is an approximation
197        0
198    }
199}
200
201/// Suspension state for resumable execution
202///
203/// When a Shape program yields or awaits, this state captures
204/// everything needed to resume execution later.
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct SuspensionState {
207    /// What condition we're waiting for
208    pub waiting_for: WaitCondition,
209    /// Program counter to resume at (for VM/JIT)
210    pub resume_pc: usize,
211    /// Saved local variables
212    #[serde(skip)]
213    #[serde(default)]
214    pub saved_locals: Vec<ValueWord>,
215    /// Saved stack (for VM)
216    #[serde(skip)]
217    #[serde(default)]
218    pub saved_stack: Vec<ValueWord>,
219}
220
221impl SuspensionState {
222    /// Create a new suspension state
223    pub fn new(waiting_for: WaitCondition, resume_pc: usize) -> Self {
224        Self {
225            waiting_for,
226            resume_pc,
227            saved_locals: Vec::new(),
228            saved_stack: Vec::new(),
229        }
230    }
231
232    /// Create with saved locals
233    pub fn with_locals(mut self, locals: Vec<ValueWord>) -> Self {
234        self.saved_locals = locals;
235        self
236    }
237
238    /// Create with saved stack
239    pub fn with_stack(mut self, stack: Vec<ValueWord>) -> Self {
240        self.saved_stack = stack;
241        self
242    }
243}
244
245/// Condition that caused suspension
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub enum WaitCondition {
248    /// Waiting for next data bar from a source
249    NextBar {
250        /// Data source name
251        source: String,
252    },
253
254    /// Waiting for a timer to fire
255    Timer {
256        /// Timer ID
257        id: u64,
258        /// Deadline (milliseconds since epoch)
259        deadline_ms: u64,
260    },
261
262    /// Waiting for an external event
263    External {
264        /// Event type filter
265        event_type: String,
266    },
267
268    /// Waiting for any event from the queue
269    AnyEvent,
270
271    /// Yielded for cooperative scheduling (no specific wait)
272    Yield,
273    /// Explicit snapshot suspension
274    Snapshot,
275    /// Waiting for a future to resolve
276    Future { id: u64 },
277}
278
279/// Shared event queue type alias
280pub type SharedEventQueue = Arc<dyn EventQueue>;
281
282/// Create a default memory-based event queue
283pub fn create_event_queue() -> SharedEventQueue {
284    Arc::new(MemoryEventQueue::new())
285}
286
287/// Create a Tokio-backed event queue (when feature enabled)
288#[cfg(feature = "tokio-runtime")]
289pub fn create_tokio_event_queue() -> SharedEventQueue {
290    Arc::new(TokioEventQueue::new())
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use std::sync::Arc;
297
298    #[test]
299    fn test_memory_event_queue_basic() {
300        let queue = MemoryEventQueue::new();
301
302        assert!(queue.is_empty());
303        assert_eq!(queue.len(), 0);
304
305        queue.push(QueuedEvent::Timer { id: 1 });
306        assert!(!queue.is_empty());
307        assert_eq!(queue.len(), 1);
308
309        let event = queue.poll();
310        assert!(matches!(event, Some(QueuedEvent::Timer { id: 1 })));
311        assert!(queue.is_empty());
312    }
313
314    #[test]
315    fn test_memory_event_queue_fifo() {
316        let queue = MemoryEventQueue::new();
317
318        queue.push(QueuedEvent::Timer { id: 1 });
319        queue.push(QueuedEvent::Timer { id: 2 });
320        queue.push(QueuedEvent::Timer { id: 3 });
321
322        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
323        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 2 })));
324        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 3 })));
325        assert!(queue.poll().is_none());
326    }
327
328    #[test]
329    fn test_poll_batch() {
330        let queue = MemoryEventQueue::new();
331
332        for i in 0..5 {
333            queue.push(QueuedEvent::Timer { id: i });
334        }
335
336        let batch = queue.poll_batch(3);
337        assert_eq!(batch.len(), 3);
338        assert_eq!(queue.len(), 2);
339
340        let remaining = queue.poll_batch(10);
341        assert_eq!(remaining.len(), 2);
342        assert!(queue.is_empty());
343    }
344
345    #[test]
346    fn test_suspension_state() {
347        let state = SuspensionState::new(
348            WaitCondition::NextBar {
349                source: "data".to_string(),
350            },
351            42,
352        )
353        .with_locals(vec![ValueWord::from_f64(1.0), ValueWord::from_f64(2.0)]);
354
355        assert_eq!(state.resume_pc, 42);
356        assert_eq!(state.saved_locals.len(), 2);
357        assert!(matches!(
358            state.waiting_for,
359            WaitCondition::NextBar { source } if source == "data"
360        ));
361    }
362
363    #[test]
364    fn test_event_types_data_point() {
365        let queue = MemoryEventQueue::new();
366
367        queue.push(QueuedEvent::DataPoint {
368            source: "iot_sensors".to_string(),
369            data: ValueWord::from_f64(42.5),
370        });
371
372        let event = queue.poll().unwrap();
373        match event {
374            QueuedEvent::DataPoint { source, data } => {
375                assert_eq!(source, "iot_sensors");
376                assert!((data.as_f64().unwrap() - 42.5).abs() < 0.001);
377            }
378            _ => panic!("Expected DataPoint event"),
379        }
380    }
381
382    #[test]
383    fn test_event_types_external() {
384        let queue = MemoryEventQueue::new();
385
386        queue.push(QueuedEvent::External {
387            payload: vec![1, 2, 3, 4],
388        });
389
390        let event = queue.poll().unwrap();
391        match event {
392            QueuedEvent::External { payload } => {
393                assert_eq!(payload, vec![1, 2, 3, 4]);
394            }
395            _ => panic!("Expected External event"),
396        }
397    }
398
399    #[test]
400    fn test_event_types_subscription() {
401        let queue = MemoryEventQueue::new();
402
403        queue.push(QueuedEvent::Subscription {
404            subscription_id: 123,
405            source: "live_feed".to_string(),
406            data: ValueWord::from_string(Arc::new("update".to_string())),
407        });
408
409        let event = queue.poll().unwrap();
410        match event {
411            QueuedEvent::Subscription {
412                subscription_id,
413                source,
414                data,
415            } => {
416                assert_eq!(subscription_id, 123);
417                assert_eq!(source, "live_feed");
418                assert_eq!(data.as_str().unwrap(), "update");
419            }
420            _ => panic!("Expected Subscription event"),
421        }
422    }
423
424    #[test]
425    fn test_event_types_error() {
426        let queue = MemoryEventQueue::new();
427
428        queue.push(QueuedEvent::Error {
429            source: "database".to_string(),
430            message: "Connection failed".to_string(),
431        });
432
433        let event = queue.poll().unwrap();
434        match event {
435            QueuedEvent::Error { source, message } => {
436                assert_eq!(source, "database");
437                assert_eq!(message, "Connection failed");
438            }
439            _ => panic!("Expected Error event"),
440        }
441    }
442
443    #[test]
444    fn test_event_types_shutdown() {
445        let queue = MemoryEventQueue::new();
446
447        queue.push(QueuedEvent::Shutdown);
448
449        let event = queue.poll().unwrap();
450        assert!(matches!(event, QueuedEvent::Shutdown));
451    }
452
453    #[test]
454    fn test_wait_condition_variants() {
455        // Test all WaitCondition variants
456        let next_bar = WaitCondition::NextBar {
457            source: "src".to_string(),
458        };
459        assert!(matches!(next_bar, WaitCondition::NextBar { .. }));
460
461        let timer = WaitCondition::Timer {
462            id: 1,
463            deadline_ms: 1000,
464        };
465        assert!(matches!(
466            timer,
467            WaitCondition::Timer {
468                id: 1,
469                deadline_ms: 1000
470            }
471        ));
472
473        let external = WaitCondition::External {
474            event_type: "alert".to_string(),
475        };
476        assert!(matches!(external, WaitCondition::External { .. }));
477
478        let any = WaitCondition::AnyEvent;
479        assert!(matches!(any, WaitCondition::AnyEvent));
480
481        let yield_cond = WaitCondition::Yield;
482        assert!(matches!(yield_cond, WaitCondition::Yield));
483    }
484
485    #[test]
486    fn test_create_event_queue_returns_shared() {
487        let queue1 = create_event_queue();
488        let queue2 = queue1.clone();
489
490        // Push via one reference
491        queue1.push(QueuedEvent::Timer { id: 42 });
492
493        // Poll via other reference
494        let event = queue2.poll().unwrap();
495        assert!(matches!(event, QueuedEvent::Timer { id: 42 }));
496    }
497
498    #[test]
499    fn test_suspension_state_with_stack() {
500        let state = SuspensionState::new(WaitCondition::Yield, 100).with_stack(vec![
501            ValueWord::from_f64(1.0),
502            ValueWord::from_f64(2.0),
503            ValueWord::from_f64(3.0),
504        ]);
505
506        assert_eq!(state.resume_pc, 100);
507        assert_eq!(state.saved_stack.len(), 3);
508        assert!(state.saved_locals.is_empty());
509    }
510
511    #[test]
512    fn test_mixed_event_ordering() {
513        let queue = MemoryEventQueue::new();
514
515        // Push different event types
516        queue.push(QueuedEvent::Timer { id: 1 });
517        queue.push(QueuedEvent::Shutdown);
518        queue.push(QueuedEvent::DataPoint {
519            source: "test".to_string(),
520            data: ValueWord::none(),
521        });
522
523        // Verify FIFO ordering preserved across types
524        assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
525        assert!(matches!(queue.poll(), Some(QueuedEvent::Shutdown)));
526        assert!(matches!(queue.poll(), Some(QueuedEvent::DataPoint { .. })));
527        assert!(queue.poll().is_none());
528    }
529}