1use std::sync::atomic::{AtomicU64, Ordering};
25
26use crate::error::{Result, RingKernelError};
27use crate::message::MessageEnvelope;
28
29#[repr(align(128))]
36#[derive(Default)]
37pub(crate) struct CachePadded<T>(pub T);
38
39impl<T> std::ops::Deref for CachePadded<T> {
40 type Target = T;
41 fn deref(&self) -> &T {
42 &self.0
43 }
44}
45
46#[derive(Debug, Clone, Default)]
48pub struct QueueStats {
49 pub enqueued: u64,
51 pub dequeued: u64,
53 pub dropped: u64,
55 pub depth: u64,
57 pub max_depth: u64,
59}
60
61pub trait MessageQueue: Send + Sync {
66 fn capacity(&self) -> usize;
68
69 fn len(&self) -> usize;
71
72 fn is_empty(&self) -> bool {
74 self.len() == 0
75 }
76
77 fn is_full(&self) -> bool {
79 self.len() >= self.capacity()
80 }
81
82 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
84
85 fn try_dequeue(&self) -> Result<MessageEnvelope>;
87
88 fn stats(&self) -> QueueStats;
90
91 fn reset_stats(&self);
93}
94
95pub struct SpscQueue {
108 buffer: Vec<std::cell::UnsafeCell<Option<MessageEnvelope>>>,
111 capacity: usize,
113 mask: usize,
115 head: CachePadded<AtomicU64>,
118 tail: CachePadded<AtomicU64>,
121 producer_stats: CachePadded<ProducerStats>,
125 consumer_stats: CachePadded<ConsumerStats>,
127}
128
129unsafe impl Send for SpscQueue {}
138unsafe impl Sync for SpscQueue {}
139
140#[derive(Default)]
143struct ProducerStats {
144 enqueued: AtomicU64,
145 dropped: AtomicU64,
146 max_depth: AtomicU64,
147}
148
149#[derive(Default)]
151struct ConsumerStats {
152 dequeued: AtomicU64,
153}
154
155impl From<(&ProducerStats, &ConsumerStats)> for QueueStats {
156 fn from(split: (&ProducerStats, &ConsumerStats)) -> Self {
157 let (p, c) = split;
158 QueueStats {
159 enqueued: p.enqueued.load(Ordering::Relaxed),
160 dequeued: c.dequeued.load(Ordering::Relaxed),
161 dropped: p.dropped.load(Ordering::Relaxed),
162 depth: p
163 .enqueued
164 .load(Ordering::Relaxed)
165 .wrapping_sub(c.dequeued.load(Ordering::Relaxed)),
166 max_depth: p.max_depth.load(Ordering::Relaxed),
167 }
168 }
169}
170
171impl SpscQueue {
172 pub fn new(capacity: usize) -> Self {
176 let capacity = capacity.next_power_of_two();
177 let mask = capacity - 1;
178
179 let mut buffer = Vec::with_capacity(capacity);
180 for _ in 0..capacity {
181 buffer.push(std::cell::UnsafeCell::new(None));
182 }
183
184 Self {
185 buffer,
186 capacity,
187 mask,
188 head: CachePadded(AtomicU64::new(0)),
189 tail: CachePadded(AtomicU64::new(0)),
190 producer_stats: CachePadded(ProducerStats::default()),
191 consumer_stats: CachePadded(ConsumerStats::default()),
192 }
193 }
194
195 #[inline]
197 fn depth(&self) -> u64 {
198 let head = self.head.load(Ordering::Acquire);
199 let tail = self.tail.load(Ordering::Acquire);
200 head.wrapping_sub(tail)
201 }
202
203 fn update_max_depth(&self) {
207 let depth = self.depth();
208 self.producer_stats
209 .max_depth
210 .fetch_max(depth, Ordering::Relaxed);
211 }
212}
213
214impl MessageQueue for SpscQueue {
215 #[inline]
216 fn capacity(&self) -> usize {
217 self.capacity
218 }
219
220 #[inline]
221 fn len(&self) -> usize {
222 self.depth() as usize
223 }
224
225 #[inline]
226 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
227 let head = self.head.load(Ordering::Relaxed); let tail = self.tail.load(Ordering::Acquire); if head.wrapping_sub(tail) >= self.capacity as u64 {
232 self.producer_stats.dropped.fetch_add(1, Ordering::Relaxed);
233 return Err(RingKernelError::QueueFull {
234 capacity: self.capacity,
235 });
236 }
237
238 let index = (head as usize) & self.mask;
241 unsafe {
245 *self.buffer[index].get() = Some(envelope);
246 }
247
248 self.head.store(head.wrapping_add(1), Ordering::Release);
252
253 self.producer_stats.enqueued.fetch_add(1, Ordering::Relaxed);
256 self.update_max_depth();
257
258 Ok(())
259 }
260
261 #[inline]
262 fn try_dequeue(&self) -> Result<MessageEnvelope> {
263 let tail = self.tail.load(Ordering::Relaxed); let head = self.head.load(Ordering::Acquire); if head == tail {
268 return Err(RingKernelError::QueueEmpty);
269 }
270
271 let index = (tail as usize) & self.mask;
275 let envelope = unsafe {
279 (*self.buffer[index].get())
280 .take()
281 .ok_or(RingKernelError::QueueEmpty)?
282 };
283
284 self.tail.store(tail.wrapping_add(1), Ordering::Release);
288
289 self.consumer_stats.dequeued.fetch_add(1, Ordering::Relaxed);
291
292 Ok(envelope)
293 }
294
295 fn stats(&self) -> QueueStats {
296 QueueStats::from((&*self.producer_stats, &*self.consumer_stats))
297 }
298
299 fn reset_stats(&self) {
300 self.producer_stats.enqueued.store(0, Ordering::Relaxed);
301 self.producer_stats.dropped.store(0, Ordering::Relaxed);
302 self.producer_stats.max_depth.store(0, Ordering::Relaxed);
303 self.consumer_stats.dequeued.store(0, Ordering::Relaxed);
304 }
305}
306
307pub struct MpscQueue {
312 inner: SpscQueue,
314 producer_lock: parking_lot::Mutex<()>,
316}
317
318impl MpscQueue {
319 pub fn new(capacity: usize) -> Self {
321 Self {
322 inner: SpscQueue::new(capacity),
323 producer_lock: parking_lot::Mutex::new(()),
324 }
325 }
326}
327
328impl MessageQueue for MpscQueue {
329 #[inline]
330 fn capacity(&self) -> usize {
331 self.inner.capacity()
332 }
333
334 #[inline]
335 fn len(&self) -> usize {
336 self.inner.len()
337 }
338
339 #[inline]
340 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
341 let _guard = self.producer_lock.lock();
342 self.inner.try_enqueue(envelope)
343 }
344
345 #[inline]
346 fn try_dequeue(&self) -> Result<MessageEnvelope> {
347 self.inner.try_dequeue()
348 }
349
350 fn stats(&self) -> QueueStats {
351 self.inner.stats()
352 }
353
354 fn reset_stats(&self) {
355 self.inner.reset_stats()
356 }
357}
358
359pub struct BoundedQueue {
361 inner: MpscQueue,
363 not_full: parking_lot::Condvar,
365 not_empty: parking_lot::Condvar,
367 mutex: parking_lot::Mutex<()>,
369}
370
371impl BoundedQueue {
372 pub fn new(capacity: usize) -> Self {
374 Self {
375 inner: MpscQueue::new(capacity),
376 not_full: parking_lot::Condvar::new(),
377 not_empty: parking_lot::Condvar::new(),
378 mutex: parking_lot::Mutex::new(()),
379 }
380 }
381
382 pub fn enqueue_timeout(
388 &self,
389 envelope: MessageEnvelope,
390 timeout: std::time::Duration,
391 ) -> Result<()> {
392 let deadline = std::time::Instant::now() + timeout;
393
394 loop {
395 match self.inner.try_enqueue(envelope.clone()) {
396 Ok(()) => {
397 self.not_empty.notify_one();
398 return Ok(());
399 }
400 Err(RingKernelError::QueueFull { .. }) => {
401 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
402 if remaining.is_zero() {
403 return Err(RingKernelError::Timeout(timeout));
404 }
405 let mut guard = self.mutex.lock();
406 let _ = self.not_full.wait_for(&mut guard, remaining);
407 }
408 Err(e) => return Err(e),
409 }
410 }
411 }
412
413 pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
415 let deadline = std::time::Instant::now() + timeout;
416
417 loop {
418 match self.inner.try_dequeue() {
419 Ok(envelope) => {
420 self.not_full.notify_one();
421 return Ok(envelope);
422 }
423 Err(RingKernelError::QueueEmpty) => {
424 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
425 if remaining.is_zero() {
426 return Err(RingKernelError::Timeout(timeout));
427 }
428 let mut guard = self.mutex.lock();
429 let _ = self.not_empty.wait_for(&mut guard, remaining);
430 }
431 Err(e) => return Err(e),
432 }
433 }
434 }
435}
436
437impl MessageQueue for BoundedQueue {
438 fn capacity(&self) -> usize {
439 self.inner.capacity()
440 }
441
442 fn len(&self) -> usize {
443 self.inner.len()
444 }
445
446 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
447 let result = self.inner.try_enqueue(envelope);
448 if result.is_ok() {
449 self.not_empty.notify_one();
450 }
451 result
452 }
453
454 fn try_dequeue(&self) -> Result<MessageEnvelope> {
455 let result = self.inner.try_dequeue();
456 if result.is_ok() {
457 self.not_full.notify_one();
458 }
459 result
460 }
461
462 fn stats(&self) -> QueueStats {
463 self.inner.stats()
464 }
465
466 fn reset_stats(&self) {
467 self.inner.reset_stats()
468 }
469}
470
471#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
480pub enum QueueTier {
481 Small,
483 #[default]
485 Medium,
486 Large,
488 ExtraLarge,
490}
491
492impl QueueTier {
493 pub fn capacity(&self) -> usize {
495 match self {
496 Self::Small => 256,
497 Self::Medium => 1024,
498 Self::Large => 4096,
499 Self::ExtraLarge => 16384,
500 }
501 }
502
503 pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
514 let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
515
516 if needed_capacity <= 256 {
517 Self::Small
518 } else if needed_capacity <= 1024 {
519 Self::Medium
520 } else if needed_capacity <= 4096 {
521 Self::Large
522 } else {
523 Self::ExtraLarge
524 }
525 }
526
527 pub fn upgrade(&self) -> Self {
529 match self {
530 Self::Small => Self::Medium,
531 Self::Medium => Self::Large,
532 Self::Large => Self::ExtraLarge,
533 Self::ExtraLarge => Self::ExtraLarge, }
535 }
536
537 pub fn downgrade(&self) -> Self {
539 match self {
540 Self::Small => Self::Small, Self::Medium => Self::Small,
542 Self::Large => Self::Medium,
543 Self::ExtraLarge => Self::Large,
544 }
545 }
546}
547
548pub struct QueueFactory;
562
563impl QueueFactory {
564 pub fn create_spsc(tier: QueueTier) -> SpscQueue {
566 SpscQueue::new(tier.capacity())
567 }
568
569 pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
571 MpscQueue::new(tier.capacity())
572 }
573
574 pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
576 BoundedQueue::new(tier.capacity())
577 }
578
579 pub fn create_for_throughput(
586 messages_per_second: u64,
587 headroom_ms: u64,
588 ) -> Box<dyn MessageQueue> {
589 let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
590 Box::new(Self::create_mpsc(tier))
591 }
592}
593
594#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596pub enum QueueHealth {
597 Healthy,
599 Warning,
601 Critical,
603}
604
605#[derive(Debug, Clone)]
627pub struct QueueMonitor {
628 pub warning_threshold: f64,
630 pub critical_threshold: f64,
632}
633
634impl Default for QueueMonitor {
635 fn default() -> Self {
636 Self {
637 warning_threshold: 0.75, critical_threshold: 0.90, }
640 }
641}
642
643impl QueueMonitor {
644 pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
646 Self {
647 warning_threshold,
648 critical_threshold,
649 }
650 }
651
652 pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
654 let utilization = self.utilization(queue);
655
656 if utilization >= self.critical_threshold {
657 QueueHealth::Critical
658 } else if utilization >= self.warning_threshold {
659 QueueHealth::Warning
660 } else {
661 QueueHealth::Healthy
662 }
663 }
664
665 pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
667 let capacity = queue.capacity();
668 if capacity == 0 {
669 return 0.0;
670 }
671 queue.len() as f64 / capacity as f64
672 }
673
674 pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
676 self.utilization(queue) * 100.0
677 }
678
679 pub fn suggest_upgrade(
683 &self,
684 queue: &dyn MessageQueue,
685 current_tier: QueueTier,
686 ) -> Option<QueueTier> {
687 let stats = queue.stats();
688 let utilization = self.utilization(queue);
689
690 let max_util = if queue.capacity() > 0 {
694 stats.max_depth as f64 / queue.capacity() as f64
695 } else {
696 0.0
697 };
698
699 if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
700 let upgraded = current_tier.upgrade();
701 if upgraded != current_tier {
702 return Some(upgraded);
703 }
704 }
705
706 None
707 }
708
709 pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
711 queue.stats().dropped > 0
712 }
713
714 pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
716 let stats = queue.stats();
717 let total_attempted = stats.enqueued + stats.dropped;
718 if total_attempted == 0 {
719 return 0.0;
720 }
721 stats.dropped as f64 / total_attempted as f64
722 }
723}
724
725#[derive(Debug, Clone)]
727pub struct QueueMetrics {
728 pub health: QueueHealth,
730 pub utilization: f64,
732 pub stats: QueueStats,
734 pub tier: Option<QueueTier>,
736 pub suggested_upgrade: Option<QueueTier>,
738}
739
740impl QueueMetrics {
741 pub fn capture(
743 queue: &dyn MessageQueue,
744 monitor: &QueueMonitor,
745 current_tier: Option<QueueTier>,
746 ) -> Self {
747 let health = monitor.check(queue);
748 let utilization = monitor.utilization(queue);
749 let stats = queue.stats();
750 let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
751
752 Self {
753 health,
754 utilization,
755 stats,
756 tier: current_tier,
757 suggested_upgrade,
758 }
759 }
760}
761
762pub struct PartitionedQueue {
792 partitions: Vec<SpscQueue>,
794 partition_count: usize,
796 dequeue_index: AtomicU64,
798}
799
800impl PartitionedQueue {
801 pub fn new(partition_count: usize, capacity_per_partition: usize) -> Self {
808 let partition_count = partition_count.max(1).next_power_of_two();
809 let partitions = (0..partition_count)
810 .map(|_| SpscQueue::new(capacity_per_partition))
811 .collect();
812
813 Self {
814 partitions,
815 partition_count,
816 dequeue_index: AtomicU64::new(0),
817 }
818 }
819
820 pub fn with_defaults() -> Self {
824 Self::new(4, QueueTier::Medium.capacity())
825 }
826
827 pub fn for_high_contention() -> Self {
831 Self::new(8, QueueTier::Large.capacity())
832 }
833
834 #[inline]
836 pub fn partition_for(&self, source_id: u64) -> usize {
837 (source_id as usize) & (self.partition_count - 1)
838 }
839
840 pub fn partition_count(&self) -> usize {
842 self.partition_count
843 }
844
845 pub fn capacity_per_partition(&self) -> usize {
847 self.partitions.first().map_or(0, |p| p.capacity())
848 }
849
850 pub fn total_capacity(&self) -> usize {
852 self.capacity_per_partition() * self.partition_count
853 }
854
855 pub fn total_messages(&self) -> usize {
857 self.partitions.iter().map(|p| p.len()).sum()
858 }
859
860 pub fn try_enqueue_from(&self, source_id: u64, envelope: MessageEnvelope) -> Result<()> {
865 let partition = self.partition_for(source_id);
866 self.partitions[partition].try_enqueue(envelope)
867 }
868
869 pub fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
871 let source_id = envelope.header.source_kernel;
872 self.try_enqueue_from(source_id, envelope)
873 }
874
875 pub fn try_dequeue_partition(&self, partition: usize) -> Result<MessageEnvelope> {
877 if partition >= self.partition_count {
878 return Err(RingKernelError::InvalidConfig(format!(
879 "Invalid partition index: {} (max: {})",
880 partition,
881 self.partition_count - 1
882 )));
883 }
884 self.partitions[partition].try_dequeue()
885 }
886
887 pub fn try_dequeue_any(&self) -> Option<MessageEnvelope> {
891 let start_index = self.dequeue_index.fetch_add(1, Ordering::Relaxed) as usize;
892
893 for i in 0..self.partition_count {
894 let partition = (start_index + i) & (self.partition_count - 1);
895 if let Ok(envelope) = self.partitions[partition].try_dequeue() {
896 return Some(envelope);
897 }
898 }
899
900 None
901 }
902
903 pub fn partition_stats(&self, partition: usize) -> Option<QueueStats> {
905 self.partitions.get(partition).map(|p| p.stats())
906 }
907
908 pub fn stats(&self) -> PartitionedQueueStats {
910 let mut total = QueueStats::default();
911 let mut partition_stats = Vec::with_capacity(self.partition_count);
912
913 for partition in &self.partitions {
914 let stats = partition.stats();
915 total.enqueued += stats.enqueued;
916 total.dequeued += stats.dequeued;
917 total.dropped += stats.dropped;
918 total.depth += stats.depth;
919 if stats.max_depth > total.max_depth {
920 total.max_depth = stats.max_depth;
921 }
922 partition_stats.push(stats);
923 }
924
925 PartitionedQueueStats {
926 total,
927 partition_stats,
928 partition_count: self.partition_count,
929 }
930 }
931
932 pub fn reset_stats(&self) {
934 for partition in &self.partitions {
935 partition.reset_stats();
936 }
937 }
938}
939
940#[derive(Debug, Clone)]
942pub struct PartitionedQueueStats {
943 pub total: QueueStats,
945 pub partition_stats: Vec<QueueStats>,
947 pub partition_count: usize,
949}
950
951impl PartitionedQueueStats {
952 pub fn load_imbalance(&self) -> f64 {
957 if self.partition_count == 0 {
958 return 1.0;
959 }
960
961 let avg = self.total.depth as f64 / self.partition_count as f64;
962 if avg == 0.0 {
963 return 1.0;
964 }
965
966 let max = self
967 .partition_stats
968 .iter()
969 .map(|s| s.depth)
970 .max()
971 .unwrap_or(0);
972 max as f64 / avg
973 }
974
975 pub fn max_partition_utilization(&self, capacity_per_partition: usize) -> f64 {
977 if capacity_per_partition == 0 {
978 return 0.0;
979 }
980
981 let max = self
982 .partition_stats
983 .iter()
984 .map(|s| s.depth)
985 .max()
986 .unwrap_or(0);
987 max as f64 / capacity_per_partition as f64
988 }
989}
990
991#[cfg(test)]
992mod tests {
993 use super::*;
994 use crate::hlc::HlcTimestamp;
995 use crate::message::MessageHeader;
996
997 fn make_envelope() -> MessageEnvelope {
998 MessageEnvelope {
999 header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
1000 payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
1001 provenance: None,
1002 tenant_id: 0,
1003 audit_tag: crate::k2k::audit_tag::AuditTag::unspecified(),
1004 }
1005 }
1006
1007 #[test]
1008 fn test_spsc_basic() {
1009 let queue = SpscQueue::new(16);
1010
1011 assert!(queue.is_empty());
1012 assert!(!queue.is_full());
1013
1014 let env = make_envelope();
1015 queue.try_enqueue(env).unwrap();
1016
1017 assert_eq!(queue.len(), 1);
1018 assert!(!queue.is_empty());
1019
1020 let _ = queue.try_dequeue().unwrap();
1021 assert!(queue.is_empty());
1022 }
1023
1024 #[test]
1025 fn test_spsc_full() {
1026 let queue = SpscQueue::new(4);
1027
1028 for _ in 0..4 {
1029 queue.try_enqueue(make_envelope()).unwrap();
1030 }
1031
1032 assert!(queue.is_full());
1033 assert!(matches!(
1034 queue.try_enqueue(make_envelope()),
1035 Err(RingKernelError::QueueFull { .. })
1036 ));
1037 }
1038
1039 #[test]
1040 fn test_spsc_stats() {
1041 let queue = SpscQueue::new(16);
1042
1043 for _ in 0..10 {
1044 queue.try_enqueue(make_envelope()).unwrap();
1045 }
1046
1047 for _ in 0..5 {
1048 let _ = queue.try_dequeue().unwrap();
1049 }
1050
1051 let stats = queue.stats();
1052 assert_eq!(stats.enqueued, 10);
1053 assert_eq!(stats.dequeued, 5);
1054 assert_eq!(stats.depth, 5);
1055 }
1056
1057 #[test]
1058 fn test_mpsc_concurrent() {
1059 use std::sync::Arc;
1060 use std::thread;
1061
1062 let queue = Arc::new(MpscQueue::new(1024));
1063 let mut handles = vec![];
1064
1065 for _ in 0..4 {
1067 let q = Arc::clone(&queue);
1068 handles.push(thread::spawn(move || {
1069 for _ in 0..100 {
1070 q.try_enqueue(make_envelope()).unwrap();
1071 }
1072 }));
1073 }
1074
1075 for h in handles {
1077 h.join().unwrap();
1078 }
1079
1080 let stats = queue.stats();
1081 assert_eq!(stats.enqueued, 400);
1082 }
1083
1084 #[test]
1085 fn test_bounded_timeout() {
1086 let queue = BoundedQueue::new(2);
1087
1088 queue.try_enqueue(make_envelope()).unwrap();
1090 queue.try_enqueue(make_envelope()).unwrap();
1091
1092 let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
1094 assert!(matches!(result, Err(RingKernelError::Timeout(_))));
1095 }
1096
1097 #[test]
1102 fn test_queue_tier_capacities() {
1103 assert_eq!(QueueTier::Small.capacity(), 256);
1104 assert_eq!(QueueTier::Medium.capacity(), 1024);
1105 assert_eq!(QueueTier::Large.capacity(), 4096);
1106 assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
1107 }
1108
1109 #[test]
1110 fn test_queue_tier_for_throughput() {
1111 assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
1113
1114 assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
1116
1117 assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
1119
1120 assert_eq!(
1122 QueueTier::for_throughput(100000, 100),
1123 QueueTier::ExtraLarge
1124 );
1125 }
1126
1127 #[test]
1128 fn test_queue_tier_upgrade_downgrade() {
1129 assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
1130 assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
1131 assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
1132 assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
1136 assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
1137 assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
1138 }
1139
1140 #[test]
1141 fn test_queue_factory_creates_correct_capacity() {
1142 let spsc = QueueFactory::create_spsc(QueueTier::Medium);
1143 assert_eq!(spsc.capacity(), 1024);
1144
1145 let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
1146 assert_eq!(mpsc.capacity(), 4096);
1147
1148 let bounded = QueueFactory::create_bounded(QueueTier::Small);
1149 assert_eq!(bounded.capacity(), 256);
1150 }
1151
1152 #[test]
1153 fn test_queue_factory_throughput_based() {
1154 let queue = QueueFactory::create_for_throughput(10000, 100);
1155 assert_eq!(queue.capacity(), 1024);
1157 }
1158
1159 #[test]
1160 fn test_queue_monitor_health_levels() {
1161 let monitor = QueueMonitor::default();
1162 let queue = SpscQueue::new(100); assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1166
1167 for _ in 0..76 {
1169 queue.try_enqueue(make_envelope()).unwrap();
1170 }
1171 assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1172
1173 for _ in 0..26 {
1175 queue.try_enqueue(make_envelope()).unwrap();
1176 }
1177 assert_eq!(monitor.check(&queue), QueueHealth::Warning);
1178
1179 for _ in 0..18 {
1181 queue.try_enqueue(make_envelope()).unwrap();
1182 }
1183 assert_eq!(monitor.check(&queue), QueueHealth::Critical);
1184 }
1185
1186 #[test]
1187 fn test_queue_monitor_utilization() {
1188 let monitor = QueueMonitor::default();
1189 let queue = SpscQueue::new(100); assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
1192
1193 for _ in 0..64 {
1194 queue.try_enqueue(make_envelope()).unwrap();
1195 }
1196 assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
1197 }
1198
1199 #[test]
1200 fn test_queue_monitor_drop_detection() {
1201 let monitor = QueueMonitor::default();
1202 let queue = SpscQueue::new(4);
1203
1204 for _ in 0..4 {
1206 queue.try_enqueue(make_envelope()).unwrap();
1207 }
1208 assert!(!monitor.has_drops(&queue));
1209
1210 let _ = queue.try_enqueue(make_envelope());
1212 assert!(monitor.has_drops(&queue));
1213 assert!(monitor.drop_rate(&queue) > 0.0);
1214 }
1215
1216 #[test]
1217 fn test_queue_monitor_upgrade_suggestion() {
1218 let monitor = QueueMonitor::default();
1219 let queue = SpscQueue::new(QueueTier::Small.capacity());
1220
1221 assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
1223
1224 for _ in 0..200 {
1226 queue.try_enqueue(make_envelope()).unwrap();
1227 }
1228
1229 let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
1231 assert_eq!(suggestion, Some(QueueTier::Medium));
1232
1233 let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
1235 for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
1236 large_queue.try_enqueue(make_envelope()).unwrap();
1237 }
1238 assert!(monitor
1239 .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
1240 .is_none());
1241 }
1242
1243 #[test]
1244 fn test_queue_metrics_capture() {
1245 let queue = SpscQueue::new(QueueTier::Medium.capacity());
1246 let monitor = QueueMonitor::default();
1247
1248 for _ in 0..100 {
1250 queue.try_enqueue(make_envelope()).unwrap();
1251 }
1252
1253 let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
1254
1255 assert_eq!(metrics.health, QueueHealth::Healthy);
1256 assert!(metrics.utilization < 0.15);
1257 assert_eq!(metrics.stats.enqueued, 100);
1258 assert_eq!(metrics.tier, Some(QueueTier::Medium));
1259 assert!(metrics.suggested_upgrade.is_none());
1260 }
1261
1262 #[test]
1267 fn test_partitioned_queue_creation() {
1268 let queue = PartitionedQueue::new(4, 256);
1269 assert_eq!(queue.partition_count(), 4);
1270 assert_eq!(queue.capacity_per_partition(), 256);
1271 assert_eq!(queue.total_capacity(), 1024);
1272 }
1273
1274 #[test]
1275 fn test_partitioned_queue_rounds_to_power_of_two() {
1276 let queue = PartitionedQueue::new(3, 256);
1277 assert_eq!(queue.partition_count(), 4); }
1279
1280 #[test]
1281 fn test_partitioned_queue_routing() {
1282 let queue = PartitionedQueue::with_defaults();
1283
1284 let partition1 = queue.partition_for(12345);
1286 let partition2 = queue.partition_for(12345);
1287 assert_eq!(partition1, partition2);
1288
1289 let partition_a = queue.partition_for(0);
1291 let partition_b = queue.partition_for(1);
1292 assert!(partition_a != partition_b || queue.partition_count() == 1);
1293 }
1294
1295 #[test]
1296 fn test_partitioned_queue_enqueue_dequeue() {
1297 let queue = PartitionedQueue::new(4, 64);
1298
1299 for source in 0..16u64 {
1301 let mut env = make_envelope();
1302 env.header.source_kernel = source;
1303 queue.try_enqueue(env).unwrap();
1304 }
1305
1306 assert_eq!(queue.total_messages(), 16);
1307
1308 for _ in 0..16 {
1310 let env = queue.try_dequeue_any();
1311 assert!(env.is_some());
1312 }
1313
1314 assert_eq!(queue.total_messages(), 0);
1315 assert!(queue.try_dequeue_any().is_none());
1316 }
1317
1318 #[test]
1319 fn test_partitioned_queue_stats() {
1320 let queue = PartitionedQueue::new(4, 64);
1321
1322 for source in 0..20u64 {
1324 let mut env = make_envelope();
1325 env.header.source_kernel = source;
1326 queue.try_enqueue(env).unwrap();
1327 }
1328
1329 let stats = queue.stats();
1330 assert_eq!(stats.total.enqueued, 20);
1331 assert_eq!(stats.partition_count, 4);
1332 assert_eq!(stats.partition_stats.len(), 4);
1333 }
1334
1335 #[test]
1336 fn test_partitioned_queue_load_imbalance() {
1337 let queue = PartitionedQueue::new(4, 64);
1338
1339 for _ in 0..10 {
1341 let mut env = make_envelope();
1342 env.header.source_kernel = 0;
1343 queue.try_enqueue(env).unwrap();
1344 }
1345
1346 let stats = queue.stats();
1347 assert!((stats.load_imbalance() - 4.0).abs() < 0.001);
1350 }
1351
1352 #[test]
1353 fn test_partitioned_queue_dequeue_partition() {
1354 let queue = PartitionedQueue::new(4, 64);
1355
1356 let mut env = make_envelope();
1358 env.header.source_kernel = 0;
1359 queue.try_enqueue(env).unwrap();
1360
1361 let partition = queue.partition_for(0);
1362
1363 let result = queue.try_dequeue_partition(partition);
1365 assert!(result.is_ok());
1366
1367 let result = queue.try_dequeue_partition(100);
1369 assert!(result.is_err());
1370 }
1371}
1372
1373#[cfg(test)]
1374mod proptests {
1375 use super::*;
1376 use crate::hlc::HlcTimestamp;
1377 use crate::message::MessageHeader;
1378 use proptest::prelude::*;
1379
1380 fn make_envelope_with_seq(seq: u64) -> MessageEnvelope {
1382 MessageEnvelope {
1383 header: MessageHeader::new(seq, 1, 0, 8, HlcTimestamp::now(1)),
1385 payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
1386 provenance: None,
1387 tenant_id: 0,
1388 audit_tag: crate::k2k::audit_tag::AuditTag::unspecified(),
1389 }
1390 }
1391
1392 proptest! {
1393 #[test]
1394 fn spsc_capacity_invariant(cap in 1usize..=256) {
1395 let queue = SpscQueue::new(cap);
1396 let actual_cap = queue.capacity();
1397 prop_assert!(actual_cap.is_power_of_two());
1398 prop_assert!(actual_cap >= cap);
1399 }
1400
1401 #[test]
1402 fn spsc_len_never_exceeds_capacity(n in 1usize..=128) {
1403 let queue = SpscQueue::new(n);
1404 let cap = queue.capacity();
1405 for i in 0..(cap + 10) {
1406 let _ = queue.try_enqueue(make_envelope_with_seq(i as u64));
1407 prop_assert!(queue.len() <= cap);
1408 }
1409 }
1410
1411 #[test]
1412 fn spsc_fifo_ordering(count in 1usize..=64) {
1413 let queue = SpscQueue::new((count + 1).next_power_of_two());
1414
1415 for i in 0..count {
1416 queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1417 }
1418
1419 for i in 0..count {
1421 let env = queue.try_dequeue().unwrap();
1422 prop_assert_eq!(env.header.message_type, i as u64);
1423 }
1424
1425 prop_assert!(queue.is_empty());
1426 }
1427
1428 #[test]
1429 fn spsc_stats_consistency(enqueue_count in 1usize..=64) {
1430 let queue = SpscQueue::new(64);
1431 let cap = queue.capacity();
1432 let mut expected_dropped = 0u64;
1433
1434 for i in 0..enqueue_count {
1435 if queue.try_enqueue(make_envelope_with_seq(i as u64)).is_err() {
1436 expected_dropped += 1;
1437 }
1438 }
1439
1440 let stats = queue.stats();
1441 let successful = enqueue_count as u64 - expected_dropped;
1442 prop_assert_eq!(stats.enqueued, successful);
1443 prop_assert_eq!(stats.dropped, expected_dropped);
1444 prop_assert_eq!(stats.depth, successful);
1445 prop_assert!(stats.depth <= cap as u64);
1446 }
1447
1448 #[test]
1449 fn spsc_enqueue_dequeue_roundtrip(n in 1usize..=32) {
1450 let queue = SpscQueue::new(64);
1451
1452 for i in 0..n {
1453 queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1454 }
1455
1456 for _ in 0..n {
1457 queue.try_dequeue().unwrap();
1458 }
1459
1460 let stats = queue.stats();
1461 prop_assert_eq!(stats.enqueued, n as u64);
1462 prop_assert_eq!(stats.dequeued, n as u64);
1463 prop_assert_eq!(stats.depth, 0);
1464 prop_assert!(queue.is_empty());
1465 }
1466
1467 #[test]
1468 fn partitioned_routing_deterministic(source_id in 0u64..1000, partitions in 1usize..=8) {
1469 let queue = PartitionedQueue::new(partitions, 64);
1470 let p1 = queue.partition_for(source_id);
1471 let p2 = queue.partition_for(source_id);
1472 prop_assert_eq!(p1, p2);
1473 prop_assert!(p1 < queue.partition_count());
1474 }
1475 }
1476}