1use serde::{Deserialize, Serialize};
11use shape_value::KindedSlot;
18use std::sync::Arc;
19
20#[derive(Debug, Clone)]
22pub enum QueuedEvent {
23 DataPoint {
25 source: String,
27 data: KindedSlot,
29 },
30
31 Timer {
33 id: u64,
35 },
36
37 External {
39 payload: Vec<u8>,
41 },
42
43 Subscription {
45 subscription_id: u64,
47 source: String,
49 data: KindedSlot,
51 },
52
53 Error {
55 source: String,
57 message: String,
59 },
60
61 Shutdown,
63}
64
65pub trait EventQueue: Send + Sync {
71 fn poll(&self) -> Option<QueuedEvent>;
75
76 fn push(&self, event: QueuedEvent);
78
79 fn is_empty(&self) -> bool;
81
82 fn len(&self) -> usize;
84
85 fn poll_batch(&self, max: usize) -> Vec<QueuedEvent> {
89 let mut events = Vec::with_capacity(max);
90 while events.len() < max {
91 if let Some(event) = self.poll() {
92 events.push(event);
93 } else {
94 break;
95 }
96 }
97 events
98 }
99}
100
101pub struct MemoryEventQueue {
105 queue: crossbeam_queue::SegQueue<QueuedEvent>,
106}
107
108impl MemoryEventQueue {
109 pub fn new() -> Self {
111 Self {
112 queue: crossbeam_queue::SegQueue::new(),
113 }
114 }
115}
116
117impl Default for MemoryEventQueue {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl EventQueue for MemoryEventQueue {
124 fn poll(&self) -> Option<QueuedEvent> {
125 self.queue.pop()
126 }
127
128 fn push(&self, event: QueuedEvent) {
129 self.queue.push(event);
130 }
131
132 fn is_empty(&self) -> bool {
133 self.queue.is_empty()
134 }
135
136 fn len(&self) -> usize {
137 self.queue.len()
138 }
139}
140
141#[cfg(feature = "tokio-runtime")]
145pub struct TokioEventQueue {
146 sender: tokio::sync::mpsc::UnboundedSender<QueuedEvent>,
147 receiver: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<QueuedEvent>>,
148}
149
150#[cfg(feature = "tokio-runtime")]
151impl TokioEventQueue {
152 pub fn new() -> Self {
154 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
155 Self {
156 sender,
157 receiver: std::sync::Mutex::new(receiver),
158 }
159 }
160
161 pub fn sender(&self) -> tokio::sync::mpsc::UnboundedSender<QueuedEvent> {
163 self.sender.clone()
164 }
165
166 pub async fn recv_async(&self) -> Option<QueuedEvent> {
168 self.sender.clone();
171 None }
173}
174
175#[cfg(feature = "tokio-runtime")]
176impl Default for TokioEventQueue {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182#[cfg(feature = "tokio-runtime")]
183impl EventQueue for TokioEventQueue {
184 fn poll(&self) -> Option<QueuedEvent> {
185 if let Ok(mut receiver) = self.receiver.try_lock() {
186 receiver.try_recv().ok()
187 } else {
188 None
189 }
190 }
191
192 fn push(&self, event: QueuedEvent) {
193 let _ = self.sender.send(event);
194 }
195
196 fn is_empty(&self) -> bool {
197 self.sender.is_closed() || self.len() == 0
198 }
199
200 fn len(&self) -> usize {
201 0
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct SuspensionState {
220 pub waiting_for: WaitCondition,
222 pub resume_pc: usize,
224 #[serde(skip)]
226 #[serde(default)]
227 pub saved_locals: Vec<KindedSlot>,
228 #[serde(skip)]
230 #[serde(default)]
231 pub saved_stack: Vec<KindedSlot>,
232}
233
234impl SuspensionState {
235 pub fn new(waiting_for: WaitCondition, resume_pc: usize) -> Self {
237 Self {
238 waiting_for,
239 resume_pc,
240 saved_locals: Vec::new(),
241 saved_stack: Vec::new(),
242 }
243 }
244
245 pub fn with_locals(mut self, locals: Vec<KindedSlot>) -> Self {
248 self.saved_locals = locals;
249 self
250 }
251
252 pub fn with_stack(mut self, stack: Vec<KindedSlot>) -> Self {
254 self.saved_stack = stack;
255 self
256 }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub enum WaitCondition {
262 NextBar {
264 source: String,
266 },
267
268 Timer {
270 id: u64,
272 deadline_ms: u64,
274 },
275
276 External {
278 event_type: String,
280 },
281
282 AnyEvent,
284
285 Yield,
287 Snapshot,
289 Future { id: u64 },
291}
292
293pub type SharedEventQueue = Arc<dyn EventQueue>;
295
296pub fn create_event_queue() -> SharedEventQueue {
298 Arc::new(MemoryEventQueue::new())
299}
300
301#[cfg(feature = "tokio-runtime")]
303pub fn create_tokio_event_queue() -> SharedEventQueue {
304 Arc::new(TokioEventQueue::new())
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 #[allow(unused_imports)]
311 use std::sync::Arc;
312
313 #[test]
314 fn test_memory_event_queue_basic() {
315 let queue = MemoryEventQueue::new();
316
317 assert!(queue.is_empty());
318 assert_eq!(queue.len(), 0);
319
320 queue.push(QueuedEvent::Timer { id: 1 });
321 assert!(!queue.is_empty());
322 assert_eq!(queue.len(), 1);
323
324 let event = queue.poll();
325 assert!(matches!(event, Some(QueuedEvent::Timer { id: 1 })));
326 assert!(queue.is_empty());
327 }
328
329 #[test]
330 fn test_memory_event_queue_fifo() {
331 let queue = MemoryEventQueue::new();
332
333 queue.push(QueuedEvent::Timer { id: 1 });
334 queue.push(QueuedEvent::Timer { id: 2 });
335 queue.push(QueuedEvent::Timer { id: 3 });
336
337 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
338 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 2 })));
339 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 3 })));
340 assert!(queue.poll().is_none());
341 }
342
343 #[test]
344 fn test_poll_batch() {
345 let queue = MemoryEventQueue::new();
346
347 for i in 0..5 {
348 queue.push(QueuedEvent::Timer { id: i });
349 }
350
351 let batch = queue.poll_batch(3);
352 assert_eq!(batch.len(), 3);
353 assert_eq!(queue.len(), 2);
354
355 let remaining = queue.poll_batch(10);
356 assert_eq!(remaining.len(), 2);
357 assert!(queue.is_empty());
358 }
359
360 #[test]
361 fn test_suspension_state() {
362 let state = SuspensionState::new(
363 WaitCondition::NextBar {
364 source: "data".to_string(),
365 },
366 42,
367 )
368 .with_locals(vec![KindedSlot::from_number(1.0), KindedSlot::from_number(2.0)]);
369
370 assert_eq!(state.resume_pc, 42);
371 assert_eq!(state.saved_locals.len(), 2);
372 assert!(matches!(
373 &state.waiting_for,
374 WaitCondition::NextBar { source } if source == "data"
375 ));
376 }
377
378 #[test]
379 fn test_event_types_data_point() {
380 let queue = MemoryEventQueue::new();
381
382 queue.push(QueuedEvent::DataPoint {
383 source: "iot_sensors".to_string(),
384 data: KindedSlot::from_number(42.5),
385 });
386
387 let event = queue.poll().unwrap();
388 match event {
389 QueuedEvent::DataPoint { source, data } => {
390 assert_eq!(source, "iot_sensors");
391 assert!((data.slot().as_f64() - 42.5).abs() < 0.001);
392 }
393 _ => panic!("Expected DataPoint event"),
394 }
395 }
396
397 #[test]
398 fn test_event_types_external() {
399 let queue = MemoryEventQueue::new();
400
401 queue.push(QueuedEvent::External {
402 payload: vec![1, 2, 3, 4],
403 });
404
405 let event = queue.poll().unwrap();
406 match event {
407 QueuedEvent::External { payload } => {
408 assert_eq!(payload, vec![1, 2, 3, 4]);
409 }
410 _ => panic!("Expected External event"),
411 }
412 }
413
414 #[test]
415 fn test_event_types_subscription() {
416 let queue = MemoryEventQueue::new();
417
418 queue.push(QueuedEvent::Subscription {
419 subscription_id: 123,
420 source: "live_feed".to_string(),
421 data: KindedSlot::from_string_arc(Arc::new("update".to_string())),
422 });
423
424 let event = queue.poll().unwrap();
425 match event {
426 QueuedEvent::Subscription {
427 subscription_id,
428 source,
429 data,
430 } => {
431 assert_eq!(subscription_id, 123);
432 assert_eq!(source, "live_feed");
433 let s_ref: &str = unsafe {
435 let ptr = data.slot().raw() as *const String;
436 (*ptr).as_str()
437 };
438 assert_eq!(s_ref, "update");
439 }
440 _ => panic!("Expected Subscription event"),
441 }
442 }
443
444 #[test]
445 fn test_event_types_error() {
446 let queue = MemoryEventQueue::new();
447
448 queue.push(QueuedEvent::Error {
449 source: "database".to_string(),
450 message: "Connection failed".to_string(),
451 });
452
453 let event = queue.poll().unwrap();
454 match event {
455 QueuedEvent::Error { source, message } => {
456 assert_eq!(source, "database");
457 assert_eq!(message, "Connection failed");
458 }
459 _ => panic!("Expected Error event"),
460 }
461 }
462
463 #[test]
464 fn test_event_types_shutdown() {
465 let queue = MemoryEventQueue::new();
466
467 queue.push(QueuedEvent::Shutdown);
468
469 let event = queue.poll().unwrap();
470 assert!(matches!(event, QueuedEvent::Shutdown));
471 }
472
473 #[test]
474 fn test_wait_condition_variants() {
475 let next_bar = WaitCondition::NextBar {
477 source: "src".to_string(),
478 };
479 assert!(matches!(next_bar, WaitCondition::NextBar { .. }));
480
481 let timer = WaitCondition::Timer {
482 id: 1,
483 deadline_ms: 1000,
484 };
485 assert!(matches!(
486 timer,
487 WaitCondition::Timer {
488 id: 1,
489 deadline_ms: 1000
490 }
491 ));
492
493 let external = WaitCondition::External {
494 event_type: "alert".to_string(),
495 };
496 assert!(matches!(external, WaitCondition::External { .. }));
497
498 let any = WaitCondition::AnyEvent;
499 assert!(matches!(any, WaitCondition::AnyEvent));
500
501 let yield_cond = WaitCondition::Yield;
502 assert!(matches!(yield_cond, WaitCondition::Yield));
503 }
504
505 #[test]
506 fn test_create_event_queue_returns_shared() {
507 let queue1 = create_event_queue();
508 let queue2 = queue1.clone();
509
510 queue1.push(QueuedEvent::Timer { id: 42 });
512
513 let event = queue2.poll().unwrap();
515 assert!(matches!(event, QueuedEvent::Timer { id: 42 }));
516 }
517
518 #[test]
519 fn test_suspension_state_with_stack() {
520 let state = SuspensionState::new(WaitCondition::Yield, 100).with_stack(vec![
521 KindedSlot::from_number(1.0),
522 KindedSlot::from_number(2.0),
523 KindedSlot::from_number(3.0),
524 ]);
525
526 assert_eq!(state.resume_pc, 100);
527 assert_eq!(state.saved_stack.len(), 3);
528 assert!(state.saved_locals.is_empty());
529 }
530
531 #[test]
532 fn test_mixed_event_ordering() {
533 let queue = MemoryEventQueue::new();
534
535 queue.push(QueuedEvent::Timer { id: 1 });
537 queue.push(QueuedEvent::Shutdown);
538 queue.push(QueuedEvent::DataPoint {
539 source: "test".to_string(),
540 data: KindedSlot::none(),
541 });
542
543 assert!(matches!(queue.poll(), Some(QueuedEvent::Timer { id: 1 })));
545 assert!(matches!(queue.poll(), Some(QueuedEvent::Shutdown)));
546 assert!(matches!(queue.poll(), Some(QueuedEvent::DataPoint { .. })));
547 assert!(queue.poll().is_none());
548 }
549}