1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15 pub enqueued: u64,
17 pub dequeued: u64,
19 pub dropped: u64,
21 pub depth: u64,
23 pub max_depth: u64,
25}
26
27pub trait MessageQueue: Send + Sync {
32 fn capacity(&self) -> usize;
34
35 fn len(&self) -> usize;
37
38 fn is_empty(&self) -> bool {
40 self.len() == 0
41 }
42
43 fn is_full(&self) -> bool {
45 self.len() >= self.capacity()
46 }
47
48 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51 fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54 fn stats(&self) -> QueueStats;
56
57 fn reset_stats(&self);
59}
60
61pub struct SpscQueue {
66 buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68 capacity: usize,
70 mask: usize,
72 head: AtomicU64,
74 tail: AtomicU64,
76 stats: QueueStatsInner,
78}
79
80struct QueueStatsInner {
82 enqueued: AtomicU64,
83 dequeued: AtomicU64,
84 dropped: AtomicU64,
85 max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89 pub fn new(capacity: usize) -> Self {
93 let capacity = capacity.next_power_of_two();
94 let mask = capacity - 1;
95
96 let mut buffer = Vec::with_capacity(capacity);
97 for _ in 0..capacity {
98 buffer.push(parking_lot::Mutex::new(None));
99 }
100
101 Self {
102 buffer,
103 capacity,
104 mask,
105 head: AtomicU64::new(0),
106 tail: AtomicU64::new(0),
107 stats: QueueStatsInner {
108 enqueued: AtomicU64::new(0),
109 dequeued: AtomicU64::new(0),
110 dropped: AtomicU64::new(0),
111 max_depth: AtomicU64::new(0),
112 },
113 }
114 }
115
116 fn depth(&self) -> u64 {
118 let head = self.head.load(Ordering::Acquire);
119 let tail = self.tail.load(Ordering::Acquire);
120 head.wrapping_sub(tail)
121 }
122
123 fn update_max_depth(&self) {
125 let depth = self.depth();
126 let mut max = self.stats.max_depth.load(Ordering::Relaxed);
127 while depth > max {
128 match self.stats.max_depth.compare_exchange_weak(
129 max,
130 depth,
131 Ordering::Relaxed,
132 Ordering::Relaxed,
133 ) {
134 Ok(_) => break,
135 Err(current) => max = current,
136 }
137 }
138 }
139}
140
141impl MessageQueue for SpscQueue {
142 fn capacity(&self) -> usize {
143 self.capacity
144 }
145
146 fn len(&self) -> usize {
147 self.depth() as usize
148 }
149
150 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
151 let head = self.head.load(Ordering::Acquire);
152 let tail = self.tail.load(Ordering::Acquire);
153
154 if head.wrapping_sub(tail) >= self.capacity as u64 {
156 self.stats.dropped.fetch_add(1, Ordering::Relaxed);
157 return Err(RingKernelError::QueueFull {
158 capacity: self.capacity,
159 });
160 }
161
162 let index = (head as usize) & self.mask;
164 let mut slot = self.buffer[index].lock();
165 *slot = Some(envelope);
166 drop(slot);
167
168 self.head.store(head.wrapping_add(1), Ordering::Release);
170
171 self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
173 self.update_max_depth();
174
175 Ok(())
176 }
177
178 fn try_dequeue(&self) -> Result<MessageEnvelope> {
179 let tail = self.tail.load(Ordering::Acquire);
180 let head = self.head.load(Ordering::Acquire);
181
182 if head == tail {
184 return Err(RingKernelError::QueueEmpty);
185 }
186
187 let index = (tail as usize) & self.mask;
189 let mut slot = self.buffer[index].lock();
190 let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
191 drop(slot);
192
193 self.tail.store(tail.wrapping_add(1), Ordering::Release);
195
196 self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
198
199 Ok(envelope)
200 }
201
202 fn stats(&self) -> QueueStats {
203 QueueStats {
204 enqueued: self.stats.enqueued.load(Ordering::Relaxed),
205 dequeued: self.stats.dequeued.load(Ordering::Relaxed),
206 dropped: self.stats.dropped.load(Ordering::Relaxed),
207 depth: self.depth(),
208 max_depth: self.stats.max_depth.load(Ordering::Relaxed),
209 }
210 }
211
212 fn reset_stats(&self) {
213 self.stats.enqueued.store(0, Ordering::Relaxed);
214 self.stats.dequeued.store(0, Ordering::Relaxed);
215 self.stats.dropped.store(0, Ordering::Relaxed);
216 self.stats.max_depth.store(0, Ordering::Relaxed);
217 }
218}
219
220pub struct MpscQueue {
225 inner: SpscQueue,
227 producer_lock: parking_lot::Mutex<()>,
229}
230
231impl MpscQueue {
232 pub fn new(capacity: usize) -> Self {
234 Self {
235 inner: SpscQueue::new(capacity),
236 producer_lock: parking_lot::Mutex::new(()),
237 }
238 }
239}
240
241impl MessageQueue for MpscQueue {
242 fn capacity(&self) -> usize {
243 self.inner.capacity()
244 }
245
246 fn len(&self) -> usize {
247 self.inner.len()
248 }
249
250 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
251 let _guard = self.producer_lock.lock();
252 self.inner.try_enqueue(envelope)
253 }
254
255 fn try_dequeue(&self) -> Result<MessageEnvelope> {
256 self.inner.try_dequeue()
257 }
258
259 fn stats(&self) -> QueueStats {
260 self.inner.stats()
261 }
262
263 fn reset_stats(&self) {
264 self.inner.reset_stats()
265 }
266}
267
268pub struct BoundedQueue {
270 inner: MpscQueue,
272 not_full: parking_lot::Condvar,
274 not_empty: parking_lot::Condvar,
276 mutex: parking_lot::Mutex<()>,
278}
279
280impl BoundedQueue {
281 pub fn new(capacity: usize) -> Self {
283 Self {
284 inner: MpscQueue::new(capacity),
285 not_full: parking_lot::Condvar::new(),
286 not_empty: parking_lot::Condvar::new(),
287 mutex: parking_lot::Mutex::new(()),
288 }
289 }
290
291 pub fn enqueue_timeout(
293 &self,
294 envelope: MessageEnvelope,
295 timeout: std::time::Duration,
296 ) -> Result<()> {
297 let deadline = std::time::Instant::now() + timeout;
298
299 loop {
300 match self.inner.try_enqueue(envelope.clone()) {
301 Ok(()) => {
302 self.not_empty.notify_one();
303 return Ok(());
304 }
305 Err(RingKernelError::QueueFull { .. }) => {
306 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
307 if remaining.is_zero() {
308 return Err(RingKernelError::Timeout(timeout));
309 }
310 let mut guard = self.mutex.lock();
311 let _ = self.not_full.wait_for(&mut guard, remaining);
312 }
313 Err(e) => return Err(e),
314 }
315 }
316 }
317
318 pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
320 let deadline = std::time::Instant::now() + timeout;
321
322 loop {
323 match self.inner.try_dequeue() {
324 Ok(envelope) => {
325 self.not_full.notify_one();
326 return Ok(envelope);
327 }
328 Err(RingKernelError::QueueEmpty) => {
329 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
330 if remaining.is_zero() {
331 return Err(RingKernelError::Timeout(timeout));
332 }
333 let mut guard = self.mutex.lock();
334 let _ = self.not_empty.wait_for(&mut guard, remaining);
335 }
336 Err(e) => return Err(e),
337 }
338 }
339 }
340}
341
342impl MessageQueue for BoundedQueue {
343 fn capacity(&self) -> usize {
344 self.inner.capacity()
345 }
346
347 fn len(&self) -> usize {
348 self.inner.len()
349 }
350
351 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
352 let result = self.inner.try_enqueue(envelope);
353 if result.is_ok() {
354 self.not_empty.notify_one();
355 }
356 result
357 }
358
359 fn try_dequeue(&self) -> Result<MessageEnvelope> {
360 let result = self.inner.try_dequeue();
361 if result.is_ok() {
362 self.not_full.notify_one();
363 }
364 result
365 }
366
367 fn stats(&self) -> QueueStats {
368 self.inner.stats()
369 }
370
371 fn reset_stats(&self) {
372 self.inner.reset_stats()
373 }
374}
375
376#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
385pub enum QueueTier {
386 Small,
388 #[default]
390 Medium,
391 Large,
393 ExtraLarge,
395}
396
397impl QueueTier {
398 pub fn capacity(&self) -> usize {
400 match self {
401 Self::Small => 256,
402 Self::Medium => 1024,
403 Self::Large => 4096,
404 Self::ExtraLarge => 16384,
405 }
406 }
407
408 pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
419 let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
420
421 if needed_capacity <= 256 {
422 Self::Small
423 } else if needed_capacity <= 1024 {
424 Self::Medium
425 } else if needed_capacity <= 4096 {
426 Self::Large
427 } else {
428 Self::ExtraLarge
429 }
430 }
431
432 pub fn upgrade(&self) -> Self {
434 match self {
435 Self::Small => Self::Medium,
436 Self::Medium => Self::Large,
437 Self::Large => Self::ExtraLarge,
438 Self::ExtraLarge => Self::ExtraLarge, }
440 }
441
442 pub fn downgrade(&self) -> Self {
444 match self {
445 Self::Small => Self::Small, Self::Medium => Self::Small,
447 Self::Large => Self::Medium,
448 Self::ExtraLarge => Self::Large,
449 }
450 }
451}
452
453pub struct QueueFactory;
467
468impl QueueFactory {
469 pub fn create_spsc(tier: QueueTier) -> SpscQueue {
471 SpscQueue::new(tier.capacity())
472 }
473
474 pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
476 MpscQueue::new(tier.capacity())
477 }
478
479 pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
481 BoundedQueue::new(tier.capacity())
482 }
483
484 pub fn create_for_throughput(
491 messages_per_second: u64,
492 headroom_ms: u64,
493 ) -> Box<dyn MessageQueue> {
494 let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
495 Box::new(Self::create_mpsc(tier))
496 }
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum QueueHealth {
502 Healthy,
504 Warning,
506 Critical,
508}
509
510#[derive(Debug, Clone)]
532pub struct QueueMonitor {
533 pub warning_threshold: f64,
535 pub critical_threshold: f64,
537}
538
539impl Default for QueueMonitor {
540 fn default() -> Self {
541 Self {
542 warning_threshold: 0.75, critical_threshold: 0.90, }
545 }
546}
547
548impl QueueMonitor {
549 pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
551 Self {
552 warning_threshold,
553 critical_threshold,
554 }
555 }
556
557 pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
559 let utilization = self.utilization(queue);
560
561 if utilization >= self.critical_threshold {
562 QueueHealth::Critical
563 } else if utilization >= self.warning_threshold {
564 QueueHealth::Warning
565 } else {
566 QueueHealth::Healthy
567 }
568 }
569
570 pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
572 let capacity = queue.capacity();
573 if capacity == 0 {
574 return 0.0;
575 }
576 queue.len() as f64 / capacity as f64
577 }
578
579 pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
581 self.utilization(queue) * 100.0
582 }
583
584 pub fn suggest_upgrade(
588 &self,
589 queue: &dyn MessageQueue,
590 current_tier: QueueTier,
591 ) -> Option<QueueTier> {
592 let stats = queue.stats();
593 let utilization = self.utilization(queue);
594
595 let max_util = if queue.capacity() > 0 {
599 stats.max_depth as f64 / queue.capacity() as f64
600 } else {
601 0.0
602 };
603
604 if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
605 let upgraded = current_tier.upgrade();
606 if upgraded != current_tier {
607 return Some(upgraded);
608 }
609 }
610
611 None
612 }
613
614 pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
616 queue.stats().dropped > 0
617 }
618
619 pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
621 let stats = queue.stats();
622 let total_attempted = stats.enqueued + stats.dropped;
623 if total_attempted == 0 {
624 return 0.0;
625 }
626 stats.dropped as f64 / total_attempted as f64
627 }
628}
629
630#[derive(Debug, Clone)]
632pub struct QueueMetrics {
633 pub health: QueueHealth,
635 pub utilization: f64,
637 pub stats: QueueStats,
639 pub tier: Option<QueueTier>,
641 pub suggested_upgrade: Option<QueueTier>,
643}
644
645impl QueueMetrics {
646 pub fn capture(
648 queue: &dyn MessageQueue,
649 monitor: &QueueMonitor,
650 current_tier: Option<QueueTier>,
651 ) -> Self {
652 let health = monitor.check(queue);
653 let utilization = monitor.utilization(queue);
654 let stats = queue.stats();
655 let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
656
657 Self {
658 health,
659 utilization,
660 stats,
661 tier: current_tier,
662 suggested_upgrade,
663 }
664 }
665}
666
667pub struct PartitionedQueue {
697 partitions: Vec<SpscQueue>,
699 partition_count: usize,
701 dequeue_index: AtomicU64,
703}
704
705impl PartitionedQueue {
706 pub fn new(partition_count: usize, capacity_per_partition: usize) -> Self {
713 let partition_count = partition_count.max(1).next_power_of_two();
714 let partitions = (0..partition_count)
715 .map(|_| SpscQueue::new(capacity_per_partition))
716 .collect();
717
718 Self {
719 partitions,
720 partition_count,
721 dequeue_index: AtomicU64::new(0),
722 }
723 }
724
725 pub fn with_defaults() -> Self {
729 Self::new(4, QueueTier::Medium.capacity())
730 }
731
732 pub fn for_high_contention() -> Self {
736 Self::new(8, QueueTier::Large.capacity())
737 }
738
739 #[inline]
741 pub fn partition_for(&self, source_id: u64) -> usize {
742 (source_id as usize) & (self.partition_count - 1)
743 }
744
745 pub fn partition_count(&self) -> usize {
747 self.partition_count
748 }
749
750 pub fn capacity_per_partition(&self) -> usize {
752 self.partitions.first().map_or(0, |p| p.capacity())
753 }
754
755 pub fn total_capacity(&self) -> usize {
757 self.capacity_per_partition() * self.partition_count
758 }
759
760 pub fn total_messages(&self) -> usize {
762 self.partitions.iter().map(|p| p.len()).sum()
763 }
764
765 pub fn try_enqueue_from(&self, source_id: u64, envelope: MessageEnvelope) -> Result<()> {
770 let partition = self.partition_for(source_id);
771 self.partitions[partition].try_enqueue(envelope)
772 }
773
774 pub fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
776 let source_id = envelope.header.source_kernel;
777 self.try_enqueue_from(source_id, envelope)
778 }
779
780 pub fn try_dequeue_partition(&self, partition: usize) -> Result<MessageEnvelope> {
782 if partition >= self.partition_count {
783 return Err(RingKernelError::InvalidConfig(format!(
784 "Invalid partition index: {} (max: {})",
785 partition,
786 self.partition_count - 1
787 )));
788 }
789 self.partitions[partition].try_dequeue()
790 }
791
792 pub fn try_dequeue_any(&self) -> Option<MessageEnvelope> {
796 let start_index = self.dequeue_index.fetch_add(1, Ordering::Relaxed) as usize;
797
798 for i in 0..self.partition_count {
799 let partition = (start_index + i) & (self.partition_count - 1);
800 if let Ok(envelope) = self.partitions[partition].try_dequeue() {
801 return Some(envelope);
802 }
803 }
804
805 None
806 }
807
808 pub fn partition_stats(&self, partition: usize) -> Option<QueueStats> {
810 self.partitions.get(partition).map(|p| p.stats())
811 }
812
813 pub fn stats(&self) -> PartitionedQueueStats {
815 let mut total = QueueStats::default();
816 let mut partition_stats = Vec::with_capacity(self.partition_count);
817
818 for partition in &self.partitions {
819 let stats = partition.stats();
820 total.enqueued += stats.enqueued;
821 total.dequeued += stats.dequeued;
822 total.dropped += stats.dropped;
823 total.depth += stats.depth;
824 if stats.max_depth > total.max_depth {
825 total.max_depth = stats.max_depth;
826 }
827 partition_stats.push(stats);
828 }
829
830 PartitionedQueueStats {
831 total,
832 partition_stats,
833 partition_count: self.partition_count,
834 }
835 }
836
837 pub fn reset_stats(&self) {
839 for partition in &self.partitions {
840 partition.reset_stats();
841 }
842 }
843}
844
845#[derive(Debug, Clone)]
847pub struct PartitionedQueueStats {
848 pub total: QueueStats,
850 pub partition_stats: Vec<QueueStats>,
852 pub partition_count: usize,
854}
855
856impl PartitionedQueueStats {
857 pub fn load_imbalance(&self) -> f64 {
862 if self.partition_count == 0 {
863 return 1.0;
864 }
865
866 let avg = self.total.depth as f64 / self.partition_count as f64;
867 if avg == 0.0 {
868 return 1.0;
869 }
870
871 let max = self
872 .partition_stats
873 .iter()
874 .map(|s| s.depth)
875 .max()
876 .unwrap_or(0);
877 max as f64 / avg
878 }
879
880 pub fn max_partition_utilization(&self, capacity_per_partition: usize) -> f64 {
882 if capacity_per_partition == 0 {
883 return 0.0;
884 }
885
886 let max = self
887 .partition_stats
888 .iter()
889 .map(|s| s.depth)
890 .max()
891 .unwrap_or(0);
892 max as f64 / capacity_per_partition as f64
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899 use crate::hlc::HlcTimestamp;
900 use crate::message::MessageHeader;
901
902 fn make_envelope() -> MessageEnvelope {
903 MessageEnvelope {
904 header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
905 payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
906 }
907 }
908
909 #[test]
910 fn test_spsc_basic() {
911 let queue = SpscQueue::new(16);
912
913 assert!(queue.is_empty());
914 assert!(!queue.is_full());
915
916 let env = make_envelope();
917 queue.try_enqueue(env).unwrap();
918
919 assert_eq!(queue.len(), 1);
920 assert!(!queue.is_empty());
921
922 let _ = queue.try_dequeue().unwrap();
923 assert!(queue.is_empty());
924 }
925
926 #[test]
927 fn test_spsc_full() {
928 let queue = SpscQueue::new(4);
929
930 for _ in 0..4 {
931 queue.try_enqueue(make_envelope()).unwrap();
932 }
933
934 assert!(queue.is_full());
935 assert!(matches!(
936 queue.try_enqueue(make_envelope()),
937 Err(RingKernelError::QueueFull { .. })
938 ));
939 }
940
941 #[test]
942 fn test_spsc_stats() {
943 let queue = SpscQueue::new(16);
944
945 for _ in 0..10 {
946 queue.try_enqueue(make_envelope()).unwrap();
947 }
948
949 for _ in 0..5 {
950 let _ = queue.try_dequeue().unwrap();
951 }
952
953 let stats = queue.stats();
954 assert_eq!(stats.enqueued, 10);
955 assert_eq!(stats.dequeued, 5);
956 assert_eq!(stats.depth, 5);
957 }
958
959 #[test]
960 fn test_mpsc_concurrent() {
961 use std::sync::Arc;
962 use std::thread;
963
964 let queue = Arc::new(MpscQueue::new(1024));
965 let mut handles = vec![];
966
967 for _ in 0..4 {
969 let q = Arc::clone(&queue);
970 handles.push(thread::spawn(move || {
971 for _ in 0..100 {
972 q.try_enqueue(make_envelope()).unwrap();
973 }
974 }));
975 }
976
977 for h in handles {
979 h.join().unwrap();
980 }
981
982 let stats = queue.stats();
983 assert_eq!(stats.enqueued, 400);
984 }
985
986 #[test]
987 fn test_bounded_timeout() {
988 let queue = BoundedQueue::new(2);
989
990 queue.try_enqueue(make_envelope()).unwrap();
992 queue.try_enqueue(make_envelope()).unwrap();
993
994 let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
996 assert!(matches!(result, Err(RingKernelError::Timeout(_))));
997 }
998
999 #[test]
1004 fn test_queue_tier_capacities() {
1005 assert_eq!(QueueTier::Small.capacity(), 256);
1006 assert_eq!(QueueTier::Medium.capacity(), 1024);
1007 assert_eq!(QueueTier::Large.capacity(), 4096);
1008 assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
1009 }
1010
1011 #[test]
1012 fn test_queue_tier_for_throughput() {
1013 assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
1015
1016 assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
1018
1019 assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
1021
1022 assert_eq!(
1024 QueueTier::for_throughput(100000, 100),
1025 QueueTier::ExtraLarge
1026 );
1027 }
1028
1029 #[test]
1030 fn test_queue_tier_upgrade_downgrade() {
1031 assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
1032 assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
1033 assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
1034 assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
1038 assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
1039 assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
1040 }
1041
1042 #[test]
1043 fn test_queue_factory_creates_correct_capacity() {
1044 let spsc = QueueFactory::create_spsc(QueueTier::Medium);
1045 assert_eq!(spsc.capacity(), 1024);
1046
1047 let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
1048 assert_eq!(mpsc.capacity(), 4096);
1049
1050 let bounded = QueueFactory::create_bounded(QueueTier::Small);
1051 assert_eq!(bounded.capacity(), 256);
1052 }
1053
1054 #[test]
1055 fn test_queue_factory_throughput_based() {
1056 let queue = QueueFactory::create_for_throughput(10000, 100);
1057 assert_eq!(queue.capacity(), 1024);
1059 }
1060
1061 #[test]
1062 fn test_queue_monitor_health_levels() {
1063 let monitor = QueueMonitor::default();
1064 let queue = SpscQueue::new(100); assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1068
1069 for _ in 0..76 {
1071 queue.try_enqueue(make_envelope()).unwrap();
1072 }
1073 assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1074
1075 for _ in 0..26 {
1077 queue.try_enqueue(make_envelope()).unwrap();
1078 }
1079 assert_eq!(monitor.check(&queue), QueueHealth::Warning);
1080
1081 for _ in 0..18 {
1083 queue.try_enqueue(make_envelope()).unwrap();
1084 }
1085 assert_eq!(monitor.check(&queue), QueueHealth::Critical);
1086 }
1087
1088 #[test]
1089 fn test_queue_monitor_utilization() {
1090 let monitor = QueueMonitor::default();
1091 let queue = SpscQueue::new(100); assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
1094
1095 for _ in 0..64 {
1096 queue.try_enqueue(make_envelope()).unwrap();
1097 }
1098 assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
1099 }
1100
1101 #[test]
1102 fn test_queue_monitor_drop_detection() {
1103 let monitor = QueueMonitor::default();
1104 let queue = SpscQueue::new(4);
1105
1106 for _ in 0..4 {
1108 queue.try_enqueue(make_envelope()).unwrap();
1109 }
1110 assert!(!monitor.has_drops(&queue));
1111
1112 let _ = queue.try_enqueue(make_envelope());
1114 assert!(monitor.has_drops(&queue));
1115 assert!(monitor.drop_rate(&queue) > 0.0);
1116 }
1117
1118 #[test]
1119 fn test_queue_monitor_upgrade_suggestion() {
1120 let monitor = QueueMonitor::default();
1121 let queue = SpscQueue::new(QueueTier::Small.capacity());
1122
1123 assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
1125
1126 for _ in 0..200 {
1128 queue.try_enqueue(make_envelope()).unwrap();
1129 }
1130
1131 let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
1133 assert_eq!(suggestion, Some(QueueTier::Medium));
1134
1135 let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
1137 for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
1138 large_queue.try_enqueue(make_envelope()).unwrap();
1139 }
1140 assert!(monitor
1141 .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
1142 .is_none());
1143 }
1144
1145 #[test]
1146 fn test_queue_metrics_capture() {
1147 let queue = SpscQueue::new(QueueTier::Medium.capacity());
1148 let monitor = QueueMonitor::default();
1149
1150 for _ in 0..100 {
1152 queue.try_enqueue(make_envelope()).unwrap();
1153 }
1154
1155 let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
1156
1157 assert_eq!(metrics.health, QueueHealth::Healthy);
1158 assert!(metrics.utilization < 0.15);
1159 assert_eq!(metrics.stats.enqueued, 100);
1160 assert_eq!(metrics.tier, Some(QueueTier::Medium));
1161 assert!(metrics.suggested_upgrade.is_none());
1162 }
1163
1164 #[test]
1169 fn test_partitioned_queue_creation() {
1170 let queue = PartitionedQueue::new(4, 256);
1171 assert_eq!(queue.partition_count(), 4);
1172 assert_eq!(queue.capacity_per_partition(), 256);
1173 assert_eq!(queue.total_capacity(), 1024);
1174 }
1175
1176 #[test]
1177 fn test_partitioned_queue_rounds_to_power_of_two() {
1178 let queue = PartitionedQueue::new(3, 256);
1179 assert_eq!(queue.partition_count(), 4); }
1181
1182 #[test]
1183 fn test_partitioned_queue_routing() {
1184 let queue = PartitionedQueue::with_defaults();
1185
1186 let partition1 = queue.partition_for(12345);
1188 let partition2 = queue.partition_for(12345);
1189 assert_eq!(partition1, partition2);
1190
1191 let partition_a = queue.partition_for(0);
1193 let partition_b = queue.partition_for(1);
1194 assert!(partition_a != partition_b || queue.partition_count() == 1);
1195 }
1196
1197 #[test]
1198 fn test_partitioned_queue_enqueue_dequeue() {
1199 let queue = PartitionedQueue::new(4, 64);
1200
1201 for source in 0..16u64 {
1203 let mut env = make_envelope();
1204 env.header.source_kernel = source;
1205 queue.try_enqueue(env).unwrap();
1206 }
1207
1208 assert_eq!(queue.total_messages(), 16);
1209
1210 for _ in 0..16 {
1212 let env = queue.try_dequeue_any();
1213 assert!(env.is_some());
1214 }
1215
1216 assert_eq!(queue.total_messages(), 0);
1217 assert!(queue.try_dequeue_any().is_none());
1218 }
1219
1220 #[test]
1221 fn test_partitioned_queue_stats() {
1222 let queue = PartitionedQueue::new(4, 64);
1223
1224 for source in 0..20u64 {
1226 let mut env = make_envelope();
1227 env.header.source_kernel = source;
1228 queue.try_enqueue(env).unwrap();
1229 }
1230
1231 let stats = queue.stats();
1232 assert_eq!(stats.total.enqueued, 20);
1233 assert_eq!(stats.partition_count, 4);
1234 assert_eq!(stats.partition_stats.len(), 4);
1235 }
1236
1237 #[test]
1238 fn test_partitioned_queue_load_imbalance() {
1239 let queue = PartitionedQueue::new(4, 64);
1240
1241 for _ in 0..10 {
1243 let mut env = make_envelope();
1244 env.header.source_kernel = 0;
1245 queue.try_enqueue(env).unwrap();
1246 }
1247
1248 let stats = queue.stats();
1249 assert!((stats.load_imbalance() - 4.0).abs() < 0.001);
1252 }
1253
1254 #[test]
1255 fn test_partitioned_queue_dequeue_partition() {
1256 let queue = PartitionedQueue::new(4, 64);
1257
1258 let mut env = make_envelope();
1260 env.header.source_kernel = 0;
1261 queue.try_enqueue(env).unwrap();
1262
1263 let partition = queue.partition_for(0);
1264
1265 let result = queue.try_dequeue_partition(partition);
1267 assert!(result.is_ok());
1268
1269 let result = queue.try_dequeue_partition(100);
1271 assert!(result.is_err());
1272 }
1273}