1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15 pub enqueued: u64,
17 pub dequeued: u64,
19 pub dropped: u64,
21 pub depth: u64,
23 pub max_depth: u64,
25}
26
27pub trait MessageQueue: Send + Sync {
32 fn capacity(&self) -> usize;
34
35 fn len(&self) -> usize;
37
38 fn is_empty(&self) -> bool {
40 self.len() == 0
41 }
42
43 fn is_full(&self) -> bool {
45 self.len() >= self.capacity()
46 }
47
48 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51 fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54 fn stats(&self) -> QueueStats;
56
57 fn reset_stats(&self);
59}
60
61pub struct SpscQueue {
66 buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68 capacity: usize,
70 mask: usize,
72 head: AtomicU64,
74 tail: AtomicU64,
76 stats: QueueStatsInner,
78}
79
80struct QueueStatsInner {
82 enqueued: AtomicU64,
83 dequeued: AtomicU64,
84 dropped: AtomicU64,
85 max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89 pub fn new(capacity: usize) -> Self {
93 let capacity = capacity.next_power_of_two();
94 let mask = capacity - 1;
95
96 let mut buffer = Vec::with_capacity(capacity);
97 for _ in 0..capacity {
98 buffer.push(parking_lot::Mutex::new(None));
99 }
100
101 Self {
102 buffer,
103 capacity,
104 mask,
105 head: AtomicU64::new(0),
106 tail: AtomicU64::new(0),
107 stats: QueueStatsInner {
108 enqueued: AtomicU64::new(0),
109 dequeued: AtomicU64::new(0),
110 dropped: AtomicU64::new(0),
111 max_depth: AtomicU64::new(0),
112 },
113 }
114 }
115
116 fn depth(&self) -> u64 {
118 let head = self.head.load(Ordering::Acquire);
119 let tail = self.tail.load(Ordering::Acquire);
120 head.wrapping_sub(tail)
121 }
122
123 fn update_max_depth(&self) {
125 let depth = self.depth();
126 let mut max = self.stats.max_depth.load(Ordering::Relaxed);
127 while depth > max {
128 match self.stats.max_depth.compare_exchange_weak(
129 max,
130 depth,
131 Ordering::Relaxed,
132 Ordering::Relaxed,
133 ) {
134 Ok(_) => break,
135 Err(current) => max = current,
136 }
137 }
138 }
139}
140
141impl MessageQueue for SpscQueue {
142 fn capacity(&self) -> usize {
143 self.capacity
144 }
145
146 fn len(&self) -> usize {
147 self.depth() as usize
148 }
149
150 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
151 let head = self.head.load(Ordering::Acquire);
152 let tail = self.tail.load(Ordering::Acquire);
153
154 if head.wrapping_sub(tail) >= self.capacity as u64 {
156 self.stats.dropped.fetch_add(1, Ordering::Relaxed);
157 return Err(RingKernelError::QueueFull {
158 capacity: self.capacity,
159 });
160 }
161
162 let index = (head as usize) & self.mask;
164 let mut slot = self.buffer[index].lock();
165 *slot = Some(envelope);
166 drop(slot);
167
168 self.head.store(head.wrapping_add(1), Ordering::Release);
170
171 self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
173 self.update_max_depth();
174
175 Ok(())
176 }
177
178 fn try_dequeue(&self) -> Result<MessageEnvelope> {
179 let tail = self.tail.load(Ordering::Acquire);
180 let head = self.head.load(Ordering::Acquire);
181
182 if head == tail {
184 return Err(RingKernelError::QueueEmpty);
185 }
186
187 let index = (tail as usize) & self.mask;
189 let mut slot = self.buffer[index].lock();
190 let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
191 drop(slot);
192
193 self.tail.store(tail.wrapping_add(1), Ordering::Release);
195
196 self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
198
199 Ok(envelope)
200 }
201
202 fn stats(&self) -> QueueStats {
203 QueueStats {
204 enqueued: self.stats.enqueued.load(Ordering::Relaxed),
205 dequeued: self.stats.dequeued.load(Ordering::Relaxed),
206 dropped: self.stats.dropped.load(Ordering::Relaxed),
207 depth: self.depth(),
208 max_depth: self.stats.max_depth.load(Ordering::Relaxed),
209 }
210 }
211
212 fn reset_stats(&self) {
213 self.stats.enqueued.store(0, Ordering::Relaxed);
214 self.stats.dequeued.store(0, Ordering::Relaxed);
215 self.stats.dropped.store(0, Ordering::Relaxed);
216 self.stats.max_depth.store(0, Ordering::Relaxed);
217 }
218}
219
220pub struct MpscQueue {
225 inner: SpscQueue,
227 producer_lock: parking_lot::Mutex<()>,
229}
230
231impl MpscQueue {
232 pub fn new(capacity: usize) -> Self {
234 Self {
235 inner: SpscQueue::new(capacity),
236 producer_lock: parking_lot::Mutex::new(()),
237 }
238 }
239}
240
241impl MessageQueue for MpscQueue {
242 fn capacity(&self) -> usize {
243 self.inner.capacity()
244 }
245
246 fn len(&self) -> usize {
247 self.inner.len()
248 }
249
250 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
251 let _guard = self.producer_lock.lock();
252 self.inner.try_enqueue(envelope)
253 }
254
255 fn try_dequeue(&self) -> Result<MessageEnvelope> {
256 self.inner.try_dequeue()
257 }
258
259 fn stats(&self) -> QueueStats {
260 self.inner.stats()
261 }
262
263 fn reset_stats(&self) {
264 self.inner.reset_stats()
265 }
266}
267
268pub struct BoundedQueue {
270 inner: MpscQueue,
272 not_full: parking_lot::Condvar,
274 not_empty: parking_lot::Condvar,
276 mutex: parking_lot::Mutex<()>,
278}
279
280impl BoundedQueue {
281 pub fn new(capacity: usize) -> Self {
283 Self {
284 inner: MpscQueue::new(capacity),
285 not_full: parking_lot::Condvar::new(),
286 not_empty: parking_lot::Condvar::new(),
287 mutex: parking_lot::Mutex::new(()),
288 }
289 }
290
291 pub fn enqueue_timeout(
293 &self,
294 envelope: MessageEnvelope,
295 timeout: std::time::Duration,
296 ) -> Result<()> {
297 let deadline = std::time::Instant::now() + timeout;
298
299 loop {
300 match self.inner.try_enqueue(envelope.clone()) {
301 Ok(()) => {
302 self.not_empty.notify_one();
303 return Ok(());
304 }
305 Err(RingKernelError::QueueFull { .. }) => {
306 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
307 if remaining.is_zero() {
308 return Err(RingKernelError::Timeout(timeout));
309 }
310 let mut guard = self.mutex.lock();
311 let _ = self.not_full.wait_for(&mut guard, remaining);
312 }
313 Err(e) => return Err(e),
314 }
315 }
316 }
317
318 pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
320 let deadline = std::time::Instant::now() + timeout;
321
322 loop {
323 match self.inner.try_dequeue() {
324 Ok(envelope) => {
325 self.not_full.notify_one();
326 return Ok(envelope);
327 }
328 Err(RingKernelError::QueueEmpty) => {
329 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
330 if remaining.is_zero() {
331 return Err(RingKernelError::Timeout(timeout));
332 }
333 let mut guard = self.mutex.lock();
334 let _ = self.not_empty.wait_for(&mut guard, remaining);
335 }
336 Err(e) => return Err(e),
337 }
338 }
339 }
340}
341
342impl MessageQueue for BoundedQueue {
343 fn capacity(&self) -> usize {
344 self.inner.capacity()
345 }
346
347 fn len(&self) -> usize {
348 self.inner.len()
349 }
350
351 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
352 let result = self.inner.try_enqueue(envelope);
353 if result.is_ok() {
354 self.not_empty.notify_one();
355 }
356 result
357 }
358
359 fn try_dequeue(&self) -> Result<MessageEnvelope> {
360 let result = self.inner.try_dequeue();
361 if result.is_ok() {
362 self.not_full.notify_one();
363 }
364 result
365 }
366
367 fn stats(&self) -> QueueStats {
368 self.inner.stats()
369 }
370
371 fn reset_stats(&self) {
372 self.inner.reset_stats()
373 }
374}
375
376#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
385pub enum QueueTier {
386 Small,
388 #[default]
390 Medium,
391 Large,
393 ExtraLarge,
395}
396
397impl QueueTier {
398 pub fn capacity(&self) -> usize {
400 match self {
401 Self::Small => 256,
402 Self::Medium => 1024,
403 Self::Large => 4096,
404 Self::ExtraLarge => 16384,
405 }
406 }
407
408 pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
419 let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
420
421 if needed_capacity <= 256 {
422 Self::Small
423 } else if needed_capacity <= 1024 {
424 Self::Medium
425 } else if needed_capacity <= 4096 {
426 Self::Large
427 } else {
428 Self::ExtraLarge
429 }
430 }
431
432 pub fn upgrade(&self) -> Self {
434 match self {
435 Self::Small => Self::Medium,
436 Self::Medium => Self::Large,
437 Self::Large => Self::ExtraLarge,
438 Self::ExtraLarge => Self::ExtraLarge, }
440 }
441
442 pub fn downgrade(&self) -> Self {
444 match self {
445 Self::Small => Self::Small, Self::Medium => Self::Small,
447 Self::Large => Self::Medium,
448 Self::ExtraLarge => Self::Large,
449 }
450 }
451}
452
453pub struct QueueFactory;
467
468impl QueueFactory {
469 pub fn create_spsc(tier: QueueTier) -> SpscQueue {
471 SpscQueue::new(tier.capacity())
472 }
473
474 pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
476 MpscQueue::new(tier.capacity())
477 }
478
479 pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
481 BoundedQueue::new(tier.capacity())
482 }
483
484 pub fn create_for_throughput(
491 messages_per_second: u64,
492 headroom_ms: u64,
493 ) -> Box<dyn MessageQueue> {
494 let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
495 Box::new(Self::create_mpsc(tier))
496 }
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum QueueHealth {
502 Healthy,
504 Warning,
506 Critical,
508}
509
510#[derive(Debug, Clone)]
532pub struct QueueMonitor {
533 pub warning_threshold: f64,
535 pub critical_threshold: f64,
537}
538
539impl Default for QueueMonitor {
540 fn default() -> Self {
541 Self {
542 warning_threshold: 0.75, critical_threshold: 0.90, }
545 }
546}
547
548impl QueueMonitor {
549 pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
551 Self {
552 warning_threshold,
553 critical_threshold,
554 }
555 }
556
557 pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
559 let utilization = self.utilization(queue);
560
561 if utilization >= self.critical_threshold {
562 QueueHealth::Critical
563 } else if utilization >= self.warning_threshold {
564 QueueHealth::Warning
565 } else {
566 QueueHealth::Healthy
567 }
568 }
569
570 pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
572 let capacity = queue.capacity();
573 if capacity == 0 {
574 return 0.0;
575 }
576 queue.len() as f64 / capacity as f64
577 }
578
579 pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
581 self.utilization(queue) * 100.0
582 }
583
584 pub fn suggest_upgrade(
588 &self,
589 queue: &dyn MessageQueue,
590 current_tier: QueueTier,
591 ) -> Option<QueueTier> {
592 let stats = queue.stats();
593 let utilization = self.utilization(queue);
594
595 let max_util = if queue.capacity() > 0 {
599 stats.max_depth as f64 / queue.capacity() as f64
600 } else {
601 0.0
602 };
603
604 if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
605 let upgraded = current_tier.upgrade();
606 if upgraded != current_tier {
607 return Some(upgraded);
608 }
609 }
610
611 None
612 }
613
614 pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
616 queue.stats().dropped > 0
617 }
618
619 pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
621 let stats = queue.stats();
622 let total_attempted = stats.enqueued + stats.dropped;
623 if total_attempted == 0 {
624 return 0.0;
625 }
626 stats.dropped as f64 / total_attempted as f64
627 }
628}
629
630#[derive(Debug, Clone)]
632pub struct QueueMetrics {
633 pub health: QueueHealth,
635 pub utilization: f64,
637 pub stats: QueueStats,
639 pub tier: Option<QueueTier>,
641 pub suggested_upgrade: Option<QueueTier>,
643}
644
645impl QueueMetrics {
646 pub fn capture(
648 queue: &dyn MessageQueue,
649 monitor: &QueueMonitor,
650 current_tier: Option<QueueTier>,
651 ) -> Self {
652 let health = monitor.check(queue);
653 let utilization = monitor.utilization(queue);
654 let stats = queue.stats();
655 let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
656
657 Self {
658 health,
659 utilization,
660 stats,
661 tier: current_tier,
662 suggested_upgrade,
663 }
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670 use crate::hlc::HlcTimestamp;
671 use crate::message::MessageHeader;
672
673 fn make_envelope() -> MessageEnvelope {
674 MessageEnvelope {
675 header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
676 payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
677 }
678 }
679
680 #[test]
681 fn test_spsc_basic() {
682 let queue = SpscQueue::new(16);
683
684 assert!(queue.is_empty());
685 assert!(!queue.is_full());
686
687 let env = make_envelope();
688 queue.try_enqueue(env).unwrap();
689
690 assert_eq!(queue.len(), 1);
691 assert!(!queue.is_empty());
692
693 let _ = queue.try_dequeue().unwrap();
694 assert!(queue.is_empty());
695 }
696
697 #[test]
698 fn test_spsc_full() {
699 let queue = SpscQueue::new(4);
700
701 for _ in 0..4 {
702 queue.try_enqueue(make_envelope()).unwrap();
703 }
704
705 assert!(queue.is_full());
706 assert!(matches!(
707 queue.try_enqueue(make_envelope()),
708 Err(RingKernelError::QueueFull { .. })
709 ));
710 }
711
712 #[test]
713 fn test_spsc_stats() {
714 let queue = SpscQueue::new(16);
715
716 for _ in 0..10 {
717 queue.try_enqueue(make_envelope()).unwrap();
718 }
719
720 for _ in 0..5 {
721 let _ = queue.try_dequeue().unwrap();
722 }
723
724 let stats = queue.stats();
725 assert_eq!(stats.enqueued, 10);
726 assert_eq!(stats.dequeued, 5);
727 assert_eq!(stats.depth, 5);
728 }
729
730 #[test]
731 fn test_mpsc_concurrent() {
732 use std::sync::Arc;
733 use std::thread;
734
735 let queue = Arc::new(MpscQueue::new(1024));
736 let mut handles = vec![];
737
738 for _ in 0..4 {
740 let q = Arc::clone(&queue);
741 handles.push(thread::spawn(move || {
742 for _ in 0..100 {
743 q.try_enqueue(make_envelope()).unwrap();
744 }
745 }));
746 }
747
748 for h in handles {
750 h.join().unwrap();
751 }
752
753 let stats = queue.stats();
754 assert_eq!(stats.enqueued, 400);
755 }
756
757 #[test]
758 fn test_bounded_timeout() {
759 let queue = BoundedQueue::new(2);
760
761 queue.try_enqueue(make_envelope()).unwrap();
763 queue.try_enqueue(make_envelope()).unwrap();
764
765 let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
767 assert!(matches!(result, Err(RingKernelError::Timeout(_))));
768 }
769
770 #[test]
775 fn test_queue_tier_capacities() {
776 assert_eq!(QueueTier::Small.capacity(), 256);
777 assert_eq!(QueueTier::Medium.capacity(), 1024);
778 assert_eq!(QueueTier::Large.capacity(), 4096);
779 assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
780 }
781
782 #[test]
783 fn test_queue_tier_for_throughput() {
784 assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
786
787 assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
789
790 assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
792
793 assert_eq!(
795 QueueTier::for_throughput(100000, 100),
796 QueueTier::ExtraLarge
797 );
798 }
799
800 #[test]
801 fn test_queue_tier_upgrade_downgrade() {
802 assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
803 assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
804 assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
805 assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
809 assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
810 assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
811 }
812
813 #[test]
814 fn test_queue_factory_creates_correct_capacity() {
815 let spsc = QueueFactory::create_spsc(QueueTier::Medium);
816 assert_eq!(spsc.capacity(), 1024);
817
818 let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
819 assert_eq!(mpsc.capacity(), 4096);
820
821 let bounded = QueueFactory::create_bounded(QueueTier::Small);
822 assert_eq!(bounded.capacity(), 256);
823 }
824
825 #[test]
826 fn test_queue_factory_throughput_based() {
827 let queue = QueueFactory::create_for_throughput(10000, 100);
828 assert_eq!(queue.capacity(), 1024);
830 }
831
832 #[test]
833 fn test_queue_monitor_health_levels() {
834 let monitor = QueueMonitor::default();
835 let queue = SpscQueue::new(100); assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
839
840 for _ in 0..76 {
842 queue.try_enqueue(make_envelope()).unwrap();
843 }
844 assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
845
846 for _ in 0..26 {
848 queue.try_enqueue(make_envelope()).unwrap();
849 }
850 assert_eq!(monitor.check(&queue), QueueHealth::Warning);
851
852 for _ in 0..18 {
854 queue.try_enqueue(make_envelope()).unwrap();
855 }
856 assert_eq!(monitor.check(&queue), QueueHealth::Critical);
857 }
858
859 #[test]
860 fn test_queue_monitor_utilization() {
861 let monitor = QueueMonitor::default();
862 let queue = SpscQueue::new(100); assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
865
866 for _ in 0..64 {
867 queue.try_enqueue(make_envelope()).unwrap();
868 }
869 assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
870 }
871
872 #[test]
873 fn test_queue_monitor_drop_detection() {
874 let monitor = QueueMonitor::default();
875 let queue = SpscQueue::new(4);
876
877 for _ in 0..4 {
879 queue.try_enqueue(make_envelope()).unwrap();
880 }
881 assert!(!monitor.has_drops(&queue));
882
883 let _ = queue.try_enqueue(make_envelope());
885 assert!(monitor.has_drops(&queue));
886 assert!(monitor.drop_rate(&queue) > 0.0);
887 }
888
889 #[test]
890 fn test_queue_monitor_upgrade_suggestion() {
891 let monitor = QueueMonitor::default();
892 let queue = SpscQueue::new(QueueTier::Small.capacity());
893
894 assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
896
897 for _ in 0..200 {
899 queue.try_enqueue(make_envelope()).unwrap();
900 }
901
902 let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
904 assert_eq!(suggestion, Some(QueueTier::Medium));
905
906 let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
908 for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
909 large_queue.try_enqueue(make_envelope()).unwrap();
910 }
911 assert!(monitor
912 .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
913 .is_none());
914 }
915
916 #[test]
917 fn test_queue_metrics_capture() {
918 let queue = SpscQueue::new(QueueTier::Medium.capacity());
919 let monitor = QueueMonitor::default();
920
921 for _ in 0..100 {
923 queue.try_enqueue(make_envelope()).unwrap();
924 }
925
926 let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
927
928 assert_eq!(metrics.health, QueueHealth::Healthy);
929 assert!(metrics.utilization < 0.15);
930 assert_eq!(metrics.stats.enqueued, 100);
931 assert_eq!(metrics.tier, Some(QueueTier::Medium));
932 assert!(metrics.suggested_upgrade.is_none());
933 }
934}