1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15 pub enqueued: u64,
17 pub dequeued: u64,
19 pub dropped: u64,
21 pub depth: u64,
23 pub max_depth: u64,
25}
26
27pub trait MessageQueue: Send + Sync {
32 fn capacity(&self) -> usize;
34
35 fn len(&self) -> usize;
37
38 fn is_empty(&self) -> bool {
40 self.len() == 0
41 }
42
43 fn is_full(&self) -> bool {
45 self.len() >= self.capacity()
46 }
47
48 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51 fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54 fn stats(&self) -> QueueStats;
56
57 fn reset_stats(&self);
59}
60
61pub struct SpscQueue {
66 buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68 capacity: usize,
70 mask: usize,
72 head: AtomicU64,
74 tail: AtomicU64,
76 stats: QueueStatsInner,
78}
79
80struct QueueStatsInner {
82 enqueued: AtomicU64,
83 dequeued: AtomicU64,
84 dropped: AtomicU64,
85 max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89 pub fn new(capacity: usize) -> Self {
93 let capacity = capacity.next_power_of_two();
94 let mask = capacity - 1;
95
96 let mut buffer = Vec::with_capacity(capacity);
97 for _ in 0..capacity {
98 buffer.push(parking_lot::Mutex::new(None));
99 }
100
101 Self {
102 buffer,
103 capacity,
104 mask,
105 head: AtomicU64::new(0),
106 tail: AtomicU64::new(0),
107 stats: QueueStatsInner {
108 enqueued: AtomicU64::new(0),
109 dequeued: AtomicU64::new(0),
110 dropped: AtomicU64::new(0),
111 max_depth: AtomicU64::new(0),
112 },
113 }
114 }
115
116 #[inline]
118 fn depth(&self) -> u64 {
119 let head = self.head.load(Ordering::Acquire);
120 let tail = self.tail.load(Ordering::Acquire);
121 head.wrapping_sub(tail)
122 }
123
124 fn update_max_depth(&self) {
126 let depth = self.depth();
127 let mut max = self.stats.max_depth.load(Ordering::Relaxed);
128 while depth > max {
129 match self.stats.max_depth.compare_exchange_weak(
130 max,
131 depth,
132 Ordering::Relaxed,
133 Ordering::Relaxed,
134 ) {
135 Ok(_) => break,
136 Err(current) => max = current,
137 }
138 }
139 }
140}
141
142impl MessageQueue for SpscQueue {
143 #[inline]
144 fn capacity(&self) -> usize {
145 self.capacity
146 }
147
148 #[inline]
149 fn len(&self) -> usize {
150 self.depth() as usize
151 }
152
153 #[inline]
154 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
155 let head = self.head.load(Ordering::Acquire);
156 let tail = self.tail.load(Ordering::Acquire);
157
158 if head.wrapping_sub(tail) >= self.capacity as u64 {
160 self.stats.dropped.fetch_add(1, Ordering::Relaxed);
161 return Err(RingKernelError::QueueFull {
162 capacity: self.capacity,
163 });
164 }
165
166 let index = (head as usize) & self.mask;
168 let mut slot = self.buffer[index].lock();
169 *slot = Some(envelope);
170 drop(slot);
171
172 self.head.store(head.wrapping_add(1), Ordering::Release);
174
175 self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
177 self.update_max_depth();
178
179 Ok(())
180 }
181
182 #[inline]
183 fn try_dequeue(&self) -> Result<MessageEnvelope> {
184 let tail = self.tail.load(Ordering::Acquire);
185 let head = self.head.load(Ordering::Acquire);
186
187 if head == tail {
189 return Err(RingKernelError::QueueEmpty);
190 }
191
192 let index = (tail as usize) & self.mask;
194 let mut slot = self.buffer[index].lock();
195 let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
196 drop(slot);
197
198 self.tail.store(tail.wrapping_add(1), Ordering::Release);
200
201 self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
203
204 Ok(envelope)
205 }
206
207 fn stats(&self) -> QueueStats {
208 QueueStats {
209 enqueued: self.stats.enqueued.load(Ordering::Relaxed),
210 dequeued: self.stats.dequeued.load(Ordering::Relaxed),
211 dropped: self.stats.dropped.load(Ordering::Relaxed),
212 depth: self.depth(),
213 max_depth: self.stats.max_depth.load(Ordering::Relaxed),
214 }
215 }
216
217 fn reset_stats(&self) {
218 self.stats.enqueued.store(0, Ordering::Relaxed);
219 self.stats.dequeued.store(0, Ordering::Relaxed);
220 self.stats.dropped.store(0, Ordering::Relaxed);
221 self.stats.max_depth.store(0, Ordering::Relaxed);
222 }
223}
224
225pub struct MpscQueue {
230 inner: SpscQueue,
232 producer_lock: parking_lot::Mutex<()>,
234}
235
236impl MpscQueue {
237 pub fn new(capacity: usize) -> Self {
239 Self {
240 inner: SpscQueue::new(capacity),
241 producer_lock: parking_lot::Mutex::new(()),
242 }
243 }
244}
245
246impl MessageQueue for MpscQueue {
247 #[inline]
248 fn capacity(&self) -> usize {
249 self.inner.capacity()
250 }
251
252 #[inline]
253 fn len(&self) -> usize {
254 self.inner.len()
255 }
256
257 #[inline]
258 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
259 let _guard = self.producer_lock.lock();
260 self.inner.try_enqueue(envelope)
261 }
262
263 #[inline]
264 fn try_dequeue(&self) -> Result<MessageEnvelope> {
265 self.inner.try_dequeue()
266 }
267
268 fn stats(&self) -> QueueStats {
269 self.inner.stats()
270 }
271
272 fn reset_stats(&self) {
273 self.inner.reset_stats()
274 }
275}
276
277pub struct BoundedQueue {
279 inner: MpscQueue,
281 not_full: parking_lot::Condvar,
283 not_empty: parking_lot::Condvar,
285 mutex: parking_lot::Mutex<()>,
287}
288
289impl BoundedQueue {
290 pub fn new(capacity: usize) -> Self {
292 Self {
293 inner: MpscQueue::new(capacity),
294 not_full: parking_lot::Condvar::new(),
295 not_empty: parking_lot::Condvar::new(),
296 mutex: parking_lot::Mutex::new(()),
297 }
298 }
299
300 pub fn enqueue_timeout(
306 &self,
307 envelope: MessageEnvelope,
308 timeout: std::time::Duration,
309 ) -> Result<()> {
310 let deadline = std::time::Instant::now() + timeout;
311
312 loop {
313 match self.inner.try_enqueue(envelope.clone()) {
314 Ok(()) => {
315 self.not_empty.notify_one();
316 return Ok(());
317 }
318 Err(RingKernelError::QueueFull { .. }) => {
319 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
320 if remaining.is_zero() {
321 return Err(RingKernelError::Timeout(timeout));
322 }
323 let mut guard = self.mutex.lock();
324 let _ = self.not_full.wait_for(&mut guard, remaining);
325 }
326 Err(e) => return Err(e),
327 }
328 }
329 }
330
331 pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
333 let deadline = std::time::Instant::now() + timeout;
334
335 loop {
336 match self.inner.try_dequeue() {
337 Ok(envelope) => {
338 self.not_full.notify_one();
339 return Ok(envelope);
340 }
341 Err(RingKernelError::QueueEmpty) => {
342 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
343 if remaining.is_zero() {
344 return Err(RingKernelError::Timeout(timeout));
345 }
346 let mut guard = self.mutex.lock();
347 let _ = self.not_empty.wait_for(&mut guard, remaining);
348 }
349 Err(e) => return Err(e),
350 }
351 }
352 }
353}
354
355impl MessageQueue for BoundedQueue {
356 fn capacity(&self) -> usize {
357 self.inner.capacity()
358 }
359
360 fn len(&self) -> usize {
361 self.inner.len()
362 }
363
364 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
365 let result = self.inner.try_enqueue(envelope);
366 if result.is_ok() {
367 self.not_empty.notify_one();
368 }
369 result
370 }
371
372 fn try_dequeue(&self) -> Result<MessageEnvelope> {
373 let result = self.inner.try_dequeue();
374 if result.is_ok() {
375 self.not_full.notify_one();
376 }
377 result
378 }
379
380 fn stats(&self) -> QueueStats {
381 self.inner.stats()
382 }
383
384 fn reset_stats(&self) {
385 self.inner.reset_stats()
386 }
387}
388
389#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
398pub enum QueueTier {
399 Small,
401 #[default]
403 Medium,
404 Large,
406 ExtraLarge,
408}
409
410impl QueueTier {
411 pub fn capacity(&self) -> usize {
413 match self {
414 Self::Small => 256,
415 Self::Medium => 1024,
416 Self::Large => 4096,
417 Self::ExtraLarge => 16384,
418 }
419 }
420
421 pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
432 let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
433
434 if needed_capacity <= 256 {
435 Self::Small
436 } else if needed_capacity <= 1024 {
437 Self::Medium
438 } else if needed_capacity <= 4096 {
439 Self::Large
440 } else {
441 Self::ExtraLarge
442 }
443 }
444
445 pub fn upgrade(&self) -> Self {
447 match self {
448 Self::Small => Self::Medium,
449 Self::Medium => Self::Large,
450 Self::Large => Self::ExtraLarge,
451 Self::ExtraLarge => Self::ExtraLarge, }
453 }
454
455 pub fn downgrade(&self) -> Self {
457 match self {
458 Self::Small => Self::Small, Self::Medium => Self::Small,
460 Self::Large => Self::Medium,
461 Self::ExtraLarge => Self::Large,
462 }
463 }
464}
465
466pub struct QueueFactory;
480
481impl QueueFactory {
482 pub fn create_spsc(tier: QueueTier) -> SpscQueue {
484 SpscQueue::new(tier.capacity())
485 }
486
487 pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
489 MpscQueue::new(tier.capacity())
490 }
491
492 pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
494 BoundedQueue::new(tier.capacity())
495 }
496
497 pub fn create_for_throughput(
504 messages_per_second: u64,
505 headroom_ms: u64,
506 ) -> Box<dyn MessageQueue> {
507 let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
508 Box::new(Self::create_mpsc(tier))
509 }
510}
511
512#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum QueueHealth {
515 Healthy,
517 Warning,
519 Critical,
521}
522
523#[derive(Debug, Clone)]
545pub struct QueueMonitor {
546 pub warning_threshold: f64,
548 pub critical_threshold: f64,
550}
551
552impl Default for QueueMonitor {
553 fn default() -> Self {
554 Self {
555 warning_threshold: 0.75, critical_threshold: 0.90, }
558 }
559}
560
561impl QueueMonitor {
562 pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
564 Self {
565 warning_threshold,
566 critical_threshold,
567 }
568 }
569
570 pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
572 let utilization = self.utilization(queue);
573
574 if utilization >= self.critical_threshold {
575 QueueHealth::Critical
576 } else if utilization >= self.warning_threshold {
577 QueueHealth::Warning
578 } else {
579 QueueHealth::Healthy
580 }
581 }
582
583 pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
585 let capacity = queue.capacity();
586 if capacity == 0 {
587 return 0.0;
588 }
589 queue.len() as f64 / capacity as f64
590 }
591
592 pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
594 self.utilization(queue) * 100.0
595 }
596
597 pub fn suggest_upgrade(
601 &self,
602 queue: &dyn MessageQueue,
603 current_tier: QueueTier,
604 ) -> Option<QueueTier> {
605 let stats = queue.stats();
606 let utilization = self.utilization(queue);
607
608 let max_util = if queue.capacity() > 0 {
612 stats.max_depth as f64 / queue.capacity() as f64
613 } else {
614 0.0
615 };
616
617 if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
618 let upgraded = current_tier.upgrade();
619 if upgraded != current_tier {
620 return Some(upgraded);
621 }
622 }
623
624 None
625 }
626
627 pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
629 queue.stats().dropped > 0
630 }
631
632 pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
634 let stats = queue.stats();
635 let total_attempted = stats.enqueued + stats.dropped;
636 if total_attempted == 0 {
637 return 0.0;
638 }
639 stats.dropped as f64 / total_attempted as f64
640 }
641}
642
643#[derive(Debug, Clone)]
645pub struct QueueMetrics {
646 pub health: QueueHealth,
648 pub utilization: f64,
650 pub stats: QueueStats,
652 pub tier: Option<QueueTier>,
654 pub suggested_upgrade: Option<QueueTier>,
656}
657
658impl QueueMetrics {
659 pub fn capture(
661 queue: &dyn MessageQueue,
662 monitor: &QueueMonitor,
663 current_tier: Option<QueueTier>,
664 ) -> Self {
665 let health = monitor.check(queue);
666 let utilization = monitor.utilization(queue);
667 let stats = queue.stats();
668 let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
669
670 Self {
671 health,
672 utilization,
673 stats,
674 tier: current_tier,
675 suggested_upgrade,
676 }
677 }
678}
679
680pub struct PartitionedQueue {
710 partitions: Vec<SpscQueue>,
712 partition_count: usize,
714 dequeue_index: AtomicU64,
716}
717
718impl PartitionedQueue {
719 pub fn new(partition_count: usize, capacity_per_partition: usize) -> Self {
726 let partition_count = partition_count.max(1).next_power_of_two();
727 let partitions = (0..partition_count)
728 .map(|_| SpscQueue::new(capacity_per_partition))
729 .collect();
730
731 Self {
732 partitions,
733 partition_count,
734 dequeue_index: AtomicU64::new(0),
735 }
736 }
737
738 pub fn with_defaults() -> Self {
742 Self::new(4, QueueTier::Medium.capacity())
743 }
744
745 pub fn for_high_contention() -> Self {
749 Self::new(8, QueueTier::Large.capacity())
750 }
751
752 #[inline]
754 pub fn partition_for(&self, source_id: u64) -> usize {
755 (source_id as usize) & (self.partition_count - 1)
756 }
757
758 pub fn partition_count(&self) -> usize {
760 self.partition_count
761 }
762
763 pub fn capacity_per_partition(&self) -> usize {
765 self.partitions.first().map_or(0, |p| p.capacity())
766 }
767
768 pub fn total_capacity(&self) -> usize {
770 self.capacity_per_partition() * self.partition_count
771 }
772
773 pub fn total_messages(&self) -> usize {
775 self.partitions.iter().map(|p| p.len()).sum()
776 }
777
778 pub fn try_enqueue_from(&self, source_id: u64, envelope: MessageEnvelope) -> Result<()> {
783 let partition = self.partition_for(source_id);
784 self.partitions[partition].try_enqueue(envelope)
785 }
786
787 pub fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
789 let source_id = envelope.header.source_kernel;
790 self.try_enqueue_from(source_id, envelope)
791 }
792
793 pub fn try_dequeue_partition(&self, partition: usize) -> Result<MessageEnvelope> {
795 if partition >= self.partition_count {
796 return Err(RingKernelError::InvalidConfig(format!(
797 "Invalid partition index: {} (max: {})",
798 partition,
799 self.partition_count - 1
800 )));
801 }
802 self.partitions[partition].try_dequeue()
803 }
804
805 pub fn try_dequeue_any(&self) -> Option<MessageEnvelope> {
809 let start_index = self.dequeue_index.fetch_add(1, Ordering::Relaxed) as usize;
810
811 for i in 0..self.partition_count {
812 let partition = (start_index + i) & (self.partition_count - 1);
813 if let Ok(envelope) = self.partitions[partition].try_dequeue() {
814 return Some(envelope);
815 }
816 }
817
818 None
819 }
820
821 pub fn partition_stats(&self, partition: usize) -> Option<QueueStats> {
823 self.partitions.get(partition).map(|p| p.stats())
824 }
825
826 pub fn stats(&self) -> PartitionedQueueStats {
828 let mut total = QueueStats::default();
829 let mut partition_stats = Vec::with_capacity(self.partition_count);
830
831 for partition in &self.partitions {
832 let stats = partition.stats();
833 total.enqueued += stats.enqueued;
834 total.dequeued += stats.dequeued;
835 total.dropped += stats.dropped;
836 total.depth += stats.depth;
837 if stats.max_depth > total.max_depth {
838 total.max_depth = stats.max_depth;
839 }
840 partition_stats.push(stats);
841 }
842
843 PartitionedQueueStats {
844 total,
845 partition_stats,
846 partition_count: self.partition_count,
847 }
848 }
849
850 pub fn reset_stats(&self) {
852 for partition in &self.partitions {
853 partition.reset_stats();
854 }
855 }
856}
857
858#[derive(Debug, Clone)]
860pub struct PartitionedQueueStats {
861 pub total: QueueStats,
863 pub partition_stats: Vec<QueueStats>,
865 pub partition_count: usize,
867}
868
869impl PartitionedQueueStats {
870 pub fn load_imbalance(&self) -> f64 {
875 if self.partition_count == 0 {
876 return 1.0;
877 }
878
879 let avg = self.total.depth as f64 / self.partition_count as f64;
880 if avg == 0.0 {
881 return 1.0;
882 }
883
884 let max = self
885 .partition_stats
886 .iter()
887 .map(|s| s.depth)
888 .max()
889 .unwrap_or(0);
890 max as f64 / avg
891 }
892
893 pub fn max_partition_utilization(&self, capacity_per_partition: usize) -> f64 {
895 if capacity_per_partition == 0 {
896 return 0.0;
897 }
898
899 let max = self
900 .partition_stats
901 .iter()
902 .map(|s| s.depth)
903 .max()
904 .unwrap_or(0);
905 max as f64 / capacity_per_partition as f64
906 }
907}
908
909#[cfg(test)]
910mod tests {
911 use super::*;
912 use crate::hlc::HlcTimestamp;
913 use crate::message::MessageHeader;
914
915 fn make_envelope() -> MessageEnvelope {
916 MessageEnvelope {
917 header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
918 payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
919 }
920 }
921
922 #[test]
923 fn test_spsc_basic() {
924 let queue = SpscQueue::new(16);
925
926 assert!(queue.is_empty());
927 assert!(!queue.is_full());
928
929 let env = make_envelope();
930 queue.try_enqueue(env).unwrap();
931
932 assert_eq!(queue.len(), 1);
933 assert!(!queue.is_empty());
934
935 let _ = queue.try_dequeue().unwrap();
936 assert!(queue.is_empty());
937 }
938
939 #[test]
940 fn test_spsc_full() {
941 let queue = SpscQueue::new(4);
942
943 for _ in 0..4 {
944 queue.try_enqueue(make_envelope()).unwrap();
945 }
946
947 assert!(queue.is_full());
948 assert!(matches!(
949 queue.try_enqueue(make_envelope()),
950 Err(RingKernelError::QueueFull { .. })
951 ));
952 }
953
954 #[test]
955 fn test_spsc_stats() {
956 let queue = SpscQueue::new(16);
957
958 for _ in 0..10 {
959 queue.try_enqueue(make_envelope()).unwrap();
960 }
961
962 for _ in 0..5 {
963 let _ = queue.try_dequeue().unwrap();
964 }
965
966 let stats = queue.stats();
967 assert_eq!(stats.enqueued, 10);
968 assert_eq!(stats.dequeued, 5);
969 assert_eq!(stats.depth, 5);
970 }
971
972 #[test]
973 fn test_mpsc_concurrent() {
974 use std::sync::Arc;
975 use std::thread;
976
977 let queue = Arc::new(MpscQueue::new(1024));
978 let mut handles = vec![];
979
980 for _ in 0..4 {
982 let q = Arc::clone(&queue);
983 handles.push(thread::spawn(move || {
984 for _ in 0..100 {
985 q.try_enqueue(make_envelope()).unwrap();
986 }
987 }));
988 }
989
990 for h in handles {
992 h.join().unwrap();
993 }
994
995 let stats = queue.stats();
996 assert_eq!(stats.enqueued, 400);
997 }
998
999 #[test]
1000 fn test_bounded_timeout() {
1001 let queue = BoundedQueue::new(2);
1002
1003 queue.try_enqueue(make_envelope()).unwrap();
1005 queue.try_enqueue(make_envelope()).unwrap();
1006
1007 let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
1009 assert!(matches!(result, Err(RingKernelError::Timeout(_))));
1010 }
1011
1012 #[test]
1017 fn test_queue_tier_capacities() {
1018 assert_eq!(QueueTier::Small.capacity(), 256);
1019 assert_eq!(QueueTier::Medium.capacity(), 1024);
1020 assert_eq!(QueueTier::Large.capacity(), 4096);
1021 assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
1022 }
1023
1024 #[test]
1025 fn test_queue_tier_for_throughput() {
1026 assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
1028
1029 assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
1031
1032 assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
1034
1035 assert_eq!(
1037 QueueTier::for_throughput(100000, 100),
1038 QueueTier::ExtraLarge
1039 );
1040 }
1041
1042 #[test]
1043 fn test_queue_tier_upgrade_downgrade() {
1044 assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
1045 assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
1046 assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
1047 assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
1051 assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
1052 assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
1053 }
1054
1055 #[test]
1056 fn test_queue_factory_creates_correct_capacity() {
1057 let spsc = QueueFactory::create_spsc(QueueTier::Medium);
1058 assert_eq!(spsc.capacity(), 1024);
1059
1060 let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
1061 assert_eq!(mpsc.capacity(), 4096);
1062
1063 let bounded = QueueFactory::create_bounded(QueueTier::Small);
1064 assert_eq!(bounded.capacity(), 256);
1065 }
1066
1067 #[test]
1068 fn test_queue_factory_throughput_based() {
1069 let queue = QueueFactory::create_for_throughput(10000, 100);
1070 assert_eq!(queue.capacity(), 1024);
1072 }
1073
1074 #[test]
1075 fn test_queue_monitor_health_levels() {
1076 let monitor = QueueMonitor::default();
1077 let queue = SpscQueue::new(100); assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1081
1082 for _ in 0..76 {
1084 queue.try_enqueue(make_envelope()).unwrap();
1085 }
1086 assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1087
1088 for _ in 0..26 {
1090 queue.try_enqueue(make_envelope()).unwrap();
1091 }
1092 assert_eq!(monitor.check(&queue), QueueHealth::Warning);
1093
1094 for _ in 0..18 {
1096 queue.try_enqueue(make_envelope()).unwrap();
1097 }
1098 assert_eq!(monitor.check(&queue), QueueHealth::Critical);
1099 }
1100
1101 #[test]
1102 fn test_queue_monitor_utilization() {
1103 let monitor = QueueMonitor::default();
1104 let queue = SpscQueue::new(100); assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
1107
1108 for _ in 0..64 {
1109 queue.try_enqueue(make_envelope()).unwrap();
1110 }
1111 assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
1112 }
1113
1114 #[test]
1115 fn test_queue_monitor_drop_detection() {
1116 let monitor = QueueMonitor::default();
1117 let queue = SpscQueue::new(4);
1118
1119 for _ in 0..4 {
1121 queue.try_enqueue(make_envelope()).unwrap();
1122 }
1123 assert!(!monitor.has_drops(&queue));
1124
1125 let _ = queue.try_enqueue(make_envelope());
1127 assert!(monitor.has_drops(&queue));
1128 assert!(monitor.drop_rate(&queue) > 0.0);
1129 }
1130
1131 #[test]
1132 fn test_queue_monitor_upgrade_suggestion() {
1133 let monitor = QueueMonitor::default();
1134 let queue = SpscQueue::new(QueueTier::Small.capacity());
1135
1136 assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
1138
1139 for _ in 0..200 {
1141 queue.try_enqueue(make_envelope()).unwrap();
1142 }
1143
1144 let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
1146 assert_eq!(suggestion, Some(QueueTier::Medium));
1147
1148 let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
1150 for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
1151 large_queue.try_enqueue(make_envelope()).unwrap();
1152 }
1153 assert!(monitor
1154 .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
1155 .is_none());
1156 }
1157
1158 #[test]
1159 fn test_queue_metrics_capture() {
1160 let queue = SpscQueue::new(QueueTier::Medium.capacity());
1161 let monitor = QueueMonitor::default();
1162
1163 for _ in 0..100 {
1165 queue.try_enqueue(make_envelope()).unwrap();
1166 }
1167
1168 let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
1169
1170 assert_eq!(metrics.health, QueueHealth::Healthy);
1171 assert!(metrics.utilization < 0.15);
1172 assert_eq!(metrics.stats.enqueued, 100);
1173 assert_eq!(metrics.tier, Some(QueueTier::Medium));
1174 assert!(metrics.suggested_upgrade.is_none());
1175 }
1176
1177 #[test]
1182 fn test_partitioned_queue_creation() {
1183 let queue = PartitionedQueue::new(4, 256);
1184 assert_eq!(queue.partition_count(), 4);
1185 assert_eq!(queue.capacity_per_partition(), 256);
1186 assert_eq!(queue.total_capacity(), 1024);
1187 }
1188
1189 #[test]
1190 fn test_partitioned_queue_rounds_to_power_of_two() {
1191 let queue = PartitionedQueue::new(3, 256);
1192 assert_eq!(queue.partition_count(), 4); }
1194
1195 #[test]
1196 fn test_partitioned_queue_routing() {
1197 let queue = PartitionedQueue::with_defaults();
1198
1199 let partition1 = queue.partition_for(12345);
1201 let partition2 = queue.partition_for(12345);
1202 assert_eq!(partition1, partition2);
1203
1204 let partition_a = queue.partition_for(0);
1206 let partition_b = queue.partition_for(1);
1207 assert!(partition_a != partition_b || queue.partition_count() == 1);
1208 }
1209
1210 #[test]
1211 fn test_partitioned_queue_enqueue_dequeue() {
1212 let queue = PartitionedQueue::new(4, 64);
1213
1214 for source in 0..16u64 {
1216 let mut env = make_envelope();
1217 env.header.source_kernel = source;
1218 queue.try_enqueue(env).unwrap();
1219 }
1220
1221 assert_eq!(queue.total_messages(), 16);
1222
1223 for _ in 0..16 {
1225 let env = queue.try_dequeue_any();
1226 assert!(env.is_some());
1227 }
1228
1229 assert_eq!(queue.total_messages(), 0);
1230 assert!(queue.try_dequeue_any().is_none());
1231 }
1232
1233 #[test]
1234 fn test_partitioned_queue_stats() {
1235 let queue = PartitionedQueue::new(4, 64);
1236
1237 for source in 0..20u64 {
1239 let mut env = make_envelope();
1240 env.header.source_kernel = source;
1241 queue.try_enqueue(env).unwrap();
1242 }
1243
1244 let stats = queue.stats();
1245 assert_eq!(stats.total.enqueued, 20);
1246 assert_eq!(stats.partition_count, 4);
1247 assert_eq!(stats.partition_stats.len(), 4);
1248 }
1249
1250 #[test]
1251 fn test_partitioned_queue_load_imbalance() {
1252 let queue = PartitionedQueue::new(4, 64);
1253
1254 for _ in 0..10 {
1256 let mut env = make_envelope();
1257 env.header.source_kernel = 0;
1258 queue.try_enqueue(env).unwrap();
1259 }
1260
1261 let stats = queue.stats();
1262 assert!((stats.load_imbalance() - 4.0).abs() < 0.001);
1265 }
1266
1267 #[test]
1268 fn test_partitioned_queue_dequeue_partition() {
1269 let queue = PartitionedQueue::new(4, 64);
1270
1271 let mut env = make_envelope();
1273 env.header.source_kernel = 0;
1274 queue.try_enqueue(env).unwrap();
1275
1276 let partition = queue.partition_for(0);
1277
1278 let result = queue.try_dequeue_partition(partition);
1280 assert!(result.is_ok());
1281
1282 let result = queue.try_dequeue_partition(100);
1284 assert!(result.is_err());
1285 }
1286}
1287
1288#[cfg(test)]
1289mod proptests {
1290 use super::*;
1291 use crate::hlc::HlcTimestamp;
1292 use crate::message::MessageHeader;
1293 use proptest::prelude::*;
1294
1295 fn make_envelope_with_seq(seq: u64) -> MessageEnvelope {
1297 MessageEnvelope {
1298 header: MessageHeader::new(seq, 1, 0, 8, HlcTimestamp::now(1)),
1300 payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
1301 }
1302 }
1303
1304 proptest! {
1305 #[test]
1306 fn spsc_capacity_invariant(cap in 1usize..=256) {
1307 let queue = SpscQueue::new(cap);
1308 let actual_cap = queue.capacity();
1309 prop_assert!(actual_cap.is_power_of_two());
1310 prop_assert!(actual_cap >= cap);
1311 }
1312
1313 #[test]
1314 fn spsc_len_never_exceeds_capacity(n in 1usize..=128) {
1315 let queue = SpscQueue::new(n);
1316 let cap = queue.capacity();
1317 for i in 0..(cap + 10) {
1318 let _ = queue.try_enqueue(make_envelope_with_seq(i as u64));
1319 prop_assert!(queue.len() <= cap);
1320 }
1321 }
1322
1323 #[test]
1324 fn spsc_fifo_ordering(count in 1usize..=64) {
1325 let queue = SpscQueue::new((count + 1).next_power_of_two());
1326
1327 for i in 0..count {
1328 queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1329 }
1330
1331 for i in 0..count {
1333 let env = queue.try_dequeue().unwrap();
1334 prop_assert_eq!(env.header.message_type, i as u64);
1335 }
1336
1337 prop_assert!(queue.is_empty());
1338 }
1339
1340 #[test]
1341 fn spsc_stats_consistency(enqueue_count in 1usize..=64) {
1342 let queue = SpscQueue::new(64);
1343 let cap = queue.capacity();
1344 let mut expected_dropped = 0u64;
1345
1346 for i in 0..enqueue_count {
1347 if queue.try_enqueue(make_envelope_with_seq(i as u64)).is_err() {
1348 expected_dropped += 1;
1349 }
1350 }
1351
1352 let stats = queue.stats();
1353 let successful = enqueue_count as u64 - expected_dropped;
1354 prop_assert_eq!(stats.enqueued, successful);
1355 prop_assert_eq!(stats.dropped, expected_dropped);
1356 prop_assert_eq!(stats.depth, successful);
1357 prop_assert!(stats.depth <= cap as u64);
1358 }
1359
1360 #[test]
1361 fn spsc_enqueue_dequeue_roundtrip(n in 1usize..=32) {
1362 let queue = SpscQueue::new(64);
1363
1364 for i in 0..n {
1365 queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1366 }
1367
1368 for _ in 0..n {
1369 queue.try_dequeue().unwrap();
1370 }
1371
1372 let stats = queue.stats();
1373 prop_assert_eq!(stats.enqueued, n as u64);
1374 prop_assert_eq!(stats.dequeued, n as u64);
1375 prop_assert_eq!(stats.depth, 0);
1376 prop_assert!(queue.is_empty());
1377 }
1378
1379 #[test]
1380 fn partitioned_routing_deterministic(source_id in 0u64..1000, partitions in 1usize..=8) {
1381 let queue = PartitionedQueue::new(partitions, 64);
1382 let p1 = queue.partition_for(source_id);
1383 let p2 = queue.partition_for(source_id);
1384 prop_assert_eq!(p1, p2);
1385 prop_assert!(p1 < queue.partition_count());
1386 }
1387 }
1388}