1use serde::{Deserialize, Serialize};
11use shape_value::ValueWord;
12use std::sync::Arc;
13
14#[derive(Debug, Clone)]
16pub enum QueuedEvent {
17 DataPoint {
19 source: String,
21 data: ValueWord,
23 },
24
25 Timer {
27 id: u64,
29 },
30
31 External {
33 payload: Vec<u8>,
35 },
36
37 Subscription {
39 subscription_id: u64,
41 source: String,
43 data: ValueWord,
45 },
46
47 Error {
49 source: String,
51 message: String,
53 },
54
55 Shutdown,
57}
58
59pub trait EventQueue: Send + Sync {
65 fn poll(&self) -> Option<QueuedEvent>;
69
70 fn push(&self, event: QueuedEvent);
72
73 fn is_empty(&self) -> bool;
75
76 fn len(&self) -> usize;
78
79 fn poll_batch(&self, max: usize) -> Vec<QueuedEvent> {
83 let mut events = Vec::with_capacity(max);
84 while events.len() < max {
85 if let Some(event) = self.poll() {
86 events.push(event);
87 } else {
88 break;
89 }
90 }
91 events
92 }
93}
94
95pub struct MemoryEventQueue {
99 queue: crossbeam_queue::SegQueue<QueuedEvent>,
100}
101
102impl MemoryEventQueue {
103 pub fn new() -> Self {
105 Self {
106 queue: crossbeam_queue::SegQueue::new(),
107 }
108 }
109}
110
111impl Default for MemoryEventQueue {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117impl EventQueue for MemoryEventQueue {
118 fn poll(&self) -> Option<QueuedEvent> {
119 self.queue.pop()
120 }
121
122 fn push(&self, event: QueuedEvent) {
123 self.queue.push(event);
124 }
125
126 fn is_empty(&self) -> bool {
127 self.queue.is_empty()
128 }
129
130 fn len(&self) -> usize {
131 self.queue.len()
132 }
133}
134
135#[cfg(feature = "tokio-runtime")]
139pub struct TokioEventQueue {
140 sender: tokio::sync::mpsc::UnboundedSender<QueuedEvent>,
141 receiver: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<QueuedEvent>>,
142}
143
144#[cfg(feature = "tokio-runtime")]
145impl TokioEventQueue {
146 pub fn new() -> Self {
148 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
149 Self {
150 sender,
151 receiver: std::sync::Mutex::new(receiver),
152 }
153 }
154
155 pub fn sender(&self) -> tokio::sync::mpsc::UnboundedSender<QueuedEvent> {
157 self.sender.clone()
158 }
159
160 pub async fn recv_async(&self) -> Option<QueuedEvent> {
162 self.sender.clone();
165 None }
167}
168
169#[cfg(feature = "tokio-runtime")]
170impl Default for TokioEventQueue {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176#[cfg(feature = "tokio-runtime")]
177impl EventQueue for TokioEventQueue {
178 fn poll(&self) -> Option<QueuedEvent> {
179 if let Ok(mut receiver) = self.receiver.try_lock() {
180 receiver.try_recv().ok()
181 } else {
182 None
183 }
184 }
185
186 fn push(&self, event: QueuedEvent) {
187 let _ = self.sender.send(event);
188 }
189
190 fn is_empty(&self) -> bool {
191 self.sender.is_closed() || self.len() == 0
192 }
193
194 fn len(&self) -> usize {
195 0
198 }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct SuspensionState {
207 pub waiting_for: WaitCondition,
209 pub resume_pc: usize,
211 #[serde(skip)]
213 #[serde(default)]
214 pub saved_locals: Vec<ValueWord>,
215 #[serde(skip)]
217 #[serde(default)]
218 pub saved_stack: Vec<ValueWord>,
219}
220
221impl SuspensionState {
222 pub fn new(waiting_for: WaitCondition, resume_pc: usize) -> Self {
224 Self {
225 waiting_for,
226 resume_pc,
227 saved_locals: Vec::new(),
228 saved_stack: Vec::new(),
229 }
230 }
231
232 pub fn with_locals(mut self, locals: Vec<ValueWord>) -> Self {
234 self.saved_locals = locals;
235 self
236 }
237
238 pub fn with_stack(mut self, stack: Vec<ValueWord>) -> Self {
240 self.saved_stack = stack;
241 self
242 }
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub enum WaitCondition {
248 NextBar {
250 source: String,
252 },
253
254 Timer {
256 id: u64,
258 deadline_ms: u64,
260 },
261
262 External {
264 event_type: String,
266 },
267
268 AnyEvent,
270
271 Yield,
273 Snapshot,
275 Future { id: u64 },
277}
278
279pub type SharedEventQueue = Arc<dyn EventQueue>;
281
282pub fn create_event_queue() -> SharedEventQueue {
284 Arc::new(MemoryEventQueue::new())
285}
286
287#[cfg(feature = "tokio-runtime")]
289pub fn create_tokio_event_queue() -> SharedEventQueue {
290 Arc::new(TokioEventQueue::new())
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use std::sync::Arc;
297
298 #[test]
299 fn test_memory_event_queue_basic() {
300 let queue = MemoryEventQueue::new();
301
302 assert!(queue.is_empty());
303 assert_eq!(queue.len(), 0);
304
305 queue.push(QueuedEvent::Timer { id: 1 });
306 assert!(!queue.is_empty());
307 assert_eq!(queue.len(), 1);
308
309 let event = queue.poll();
310 assert!(matches!(event, Some(QueuedEvent::Timer { id: 1 })));
311 assert!(queue.is_empty());
312 }
313
314 #[test]
315 fn test_memory_event_queue_fifo() {
316 let queue = MemoryEventQueue::new();
317
318 queue.push(QueuedEvent::Timer { id: 1 });
319 queue.push(QueuedEvent::Timer { id: 2 });
320 queue.push(QueuedEvent::Timer { id: 3 });
321
322 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
323 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 2 })));
324 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 3 })));
325 assert!(queue.poll().is_none());
326 }
327
328 #[test]
329 fn test_poll_batch() {
330 let queue = MemoryEventQueue::new();
331
332 for i in 0..5 {
333 queue.push(QueuedEvent::Timer { id: i });
334 }
335
336 let batch = queue.poll_batch(3);
337 assert_eq!(batch.len(), 3);
338 assert_eq!(queue.len(), 2);
339
340 let remaining = queue.poll_batch(10);
341 assert_eq!(remaining.len(), 2);
342 assert!(queue.is_empty());
343 }
344
345 #[test]
346 fn test_suspension_state() {
347 let state = SuspensionState::new(
348 WaitCondition::NextBar {
349 source: "data".to_string(),
350 },
351 42,
352 )
353 .with_locals(vec![ValueWord::from_f64(1.0), ValueWord::from_f64(2.0)]);
354
355 assert_eq!(state.resume_pc, 42);
356 assert_eq!(state.saved_locals.len(), 2);
357 assert!(matches!(
358 state.waiting_for,
359 WaitCondition::NextBar { source } if source == "data"
360 ));
361 }
362
363 #[test]
364 fn test_event_types_data_point() {
365 let queue = MemoryEventQueue::new();
366
367 queue.push(QueuedEvent::DataPoint {
368 source: "iot_sensors".to_string(),
369 data: ValueWord::from_f64(42.5),
370 });
371
372 let event = queue.poll().unwrap();
373 match event {
374 QueuedEvent::DataPoint { source, data } => {
375 assert_eq!(source, "iot_sensors");
376 assert!((data.as_f64().unwrap() - 42.5).abs() < 0.001);
377 }
378 _ => panic!("Expected DataPoint event"),
379 }
380 }
381
382 #[test]
383 fn test_event_types_external() {
384 let queue = MemoryEventQueue::new();
385
386 queue.push(QueuedEvent::External {
387 payload: vec![1, 2, 3, 4],
388 });
389
390 let event = queue.poll().unwrap();
391 match event {
392 QueuedEvent::External { payload } => {
393 assert_eq!(payload, vec![1, 2, 3, 4]);
394 }
395 _ => panic!("Expected External event"),
396 }
397 }
398
399 #[test]
400 fn test_event_types_subscription() {
401 let queue = MemoryEventQueue::new();
402
403 queue.push(QueuedEvent::Subscription {
404 subscription_id: 123,
405 source: "live_feed".to_string(),
406 data: ValueWord::from_string(Arc::new("update".to_string())),
407 });
408
409 let event = queue.poll().unwrap();
410 match event {
411 QueuedEvent::Subscription {
412 subscription_id,
413 source,
414 data,
415 } => {
416 assert_eq!(subscription_id, 123);
417 assert_eq!(source, "live_feed");
418 assert_eq!(data.as_str().unwrap(), "update");
419 }
420 _ => panic!("Expected Subscription event"),
421 }
422 }
423
424 #[test]
425 fn test_event_types_error() {
426 let queue = MemoryEventQueue::new();
427
428 queue.push(QueuedEvent::Error {
429 source: "database".to_string(),
430 message: "Connection failed".to_string(),
431 });
432
433 let event = queue.poll().unwrap();
434 match event {
435 QueuedEvent::Error { source, message } => {
436 assert_eq!(source, "database");
437 assert_eq!(message, "Connection failed");
438 }
439 _ => panic!("Expected Error event"),
440 }
441 }
442
443 #[test]
444 fn test_event_types_shutdown() {
445 let queue = MemoryEventQueue::new();
446
447 queue.push(QueuedEvent::Shutdown);
448
449 let event = queue.poll().unwrap();
450 assert!(matches!(event, QueuedEvent::Shutdown));
451 }
452
453 #[test]
454 fn test_wait_condition_variants() {
455 let next_bar = WaitCondition::NextBar {
457 source: "src".to_string(),
458 };
459 assert!(matches!(next_bar, WaitCondition::NextBar { .. }));
460
461 let timer = WaitCondition::Timer {
462 id: 1,
463 deadline_ms: 1000,
464 };
465 assert!(matches!(
466 timer,
467 WaitCondition::Timer {
468 id: 1,
469 deadline_ms: 1000
470 }
471 ));
472
473 let external = WaitCondition::External {
474 event_type: "alert".to_string(),
475 };
476 assert!(matches!(external, WaitCondition::External { .. }));
477
478 let any = WaitCondition::AnyEvent;
479 assert!(matches!(any, WaitCondition::AnyEvent));
480
481 let yield_cond = WaitCondition::Yield;
482 assert!(matches!(yield_cond, WaitCondition::Yield));
483 }
484
485 #[test]
486 fn test_create_event_queue_returns_shared() {
487 let queue1 = create_event_queue();
488 let queue2 = queue1.clone();
489
490 queue1.push(QueuedEvent::Timer { id: 42 });
492
493 let event = queue2.poll().unwrap();
495 assert!(matches!(event, QueuedEvent::Timer { id: 42 }));
496 }
497
498 #[test]
499 fn test_suspension_state_with_stack() {
500 let state = SuspensionState::new(WaitCondition::Yield, 100).with_stack(vec![
501 ValueWord::from_f64(1.0),
502 ValueWord::from_f64(2.0),
503 ValueWord::from_f64(3.0),
504 ]);
505
506 assert_eq!(state.resume_pc, 100);
507 assert_eq!(state.saved_stack.len(), 3);
508 assert!(state.saved_locals.is_empty());
509 }
510
511 #[test]
512 fn test_mixed_event_ordering() {
513 let queue = MemoryEventQueue::new();
514
515 queue.push(QueuedEvent::Timer { id: 1 });
517 queue.push(QueuedEvent::Shutdown);
518 queue.push(QueuedEvent::DataPoint {
519 source: "test".to_string(),
520 data: ValueWord::none(),
521 });
522
523 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
525 assert!(matches!(queue.poll(), Some(QueuedEvent::Shutdown)));
526 assert!(matches!(queue.poll(), Some(QueuedEvent::DataPoint { .. })));
527 assert!(queue.poll().is_none());
528 }
529}