ringkernel_core/
queue.rs

1//! Lock-free message queue implementation.
2//!
3//! This module provides the core message queue abstraction used for
4//! communication between host and GPU kernels. The queue uses a ring
5//! buffer design with atomic operations for lock-free access.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12/// Statistics for a message queue.
13#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15    /// Total messages enqueued.
16    pub enqueued: u64,
17    /// Total messages dequeued.
18    pub dequeued: u64,
19    /// Messages dropped due to full queue.
20    pub dropped: u64,
21    /// Current queue depth.
22    pub depth: u64,
23    /// Maximum queue depth observed.
24    pub max_depth: u64,
25}
26
27/// Trait for message queue implementations.
28///
29/// Message queues provide lock-free FIFO communication between
30/// producers (host or other kernels) and consumers (GPU kernels).
31pub trait MessageQueue: Send + Sync {
32    /// Get the queue capacity.
33    fn capacity(&self) -> usize;
34
35    /// Get current queue size.
36    fn len(&self) -> usize;
37
38    /// Check if queue is empty.
39    fn is_empty(&self) -> bool {
40        self.len() == 0
41    }
42
43    /// Check if queue is full.
44    fn is_full(&self) -> bool {
45        self.len() >= self.capacity()
46    }
47
48    /// Try to enqueue a message envelope.
49    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51    /// Try to dequeue a message envelope.
52    fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54    /// Get queue statistics.
55    fn stats(&self) -> QueueStats;
56
57    /// Reset queue statistics.
58    fn reset_stats(&self);
59}
60
61/// Single-producer single-consumer lock-free ring buffer.
62///
63/// This implementation is optimized for the common case of one
64/// producer (host) and one consumer (GPU kernel).
65pub struct SpscQueue {
66    /// Ring buffer storage.
67    buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68    /// Capacity (power of 2).
69    capacity: usize,
70    /// Mask for index wrapping.
71    mask: usize,
72    /// Head pointer (producer writes here).
73    head: AtomicU64,
74    /// Tail pointer (consumer reads from here).
75    tail: AtomicU64,
76    /// Statistics.
77    stats: QueueStatsInner,
78}
79
80/// Internal statistics with atomics.
81struct QueueStatsInner {
82    enqueued: AtomicU64,
83    dequeued: AtomicU64,
84    dropped: AtomicU64,
85    max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89    /// Create a new queue with the given capacity.
90    ///
91    /// Capacity will be rounded up to the next power of 2.
92    pub fn new(capacity: usize) -> Self {
93        let capacity = capacity.next_power_of_two();
94        let mask = capacity - 1;
95
96        let mut buffer = Vec::with_capacity(capacity);
97        for _ in 0..capacity {
98            buffer.push(parking_lot::Mutex::new(None));
99        }
100
101        Self {
102            buffer,
103            capacity,
104            mask,
105            head: AtomicU64::new(0),
106            tail: AtomicU64::new(0),
107            stats: QueueStatsInner {
108                enqueued: AtomicU64::new(0),
109                dequeued: AtomicU64::new(0),
110                dropped: AtomicU64::new(0),
111                max_depth: AtomicU64::new(0),
112            },
113        }
114    }
115
116    /// Get current depth.
117    fn depth(&self) -> u64 {
118        let head = self.head.load(Ordering::Acquire);
119        let tail = self.tail.load(Ordering::Acquire);
120        head.wrapping_sub(tail)
121    }
122
123    /// Update max depth statistic.
124    fn update_max_depth(&self) {
125        let depth = self.depth();
126        let mut max = self.stats.max_depth.load(Ordering::Relaxed);
127        while depth > max {
128            match self.stats.max_depth.compare_exchange_weak(
129                max,
130                depth,
131                Ordering::Relaxed,
132                Ordering::Relaxed,
133            ) {
134                Ok(_) => break,
135                Err(current) => max = current,
136            }
137        }
138    }
139}
140
141impl MessageQueue for SpscQueue {
142    fn capacity(&self) -> usize {
143        self.capacity
144    }
145
146    fn len(&self) -> usize {
147        self.depth() as usize
148    }
149
150    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
151        let head = self.head.load(Ordering::Acquire);
152        let tail = self.tail.load(Ordering::Acquire);
153
154        // Check if full
155        if head.wrapping_sub(tail) >= self.capacity as u64 {
156            self.stats.dropped.fetch_add(1, Ordering::Relaxed);
157            return Err(RingKernelError::QueueFull {
158                capacity: self.capacity,
159            });
160        }
161
162        // Get slot
163        let index = (head as usize) & self.mask;
164        let mut slot = self.buffer[index].lock();
165        *slot = Some(envelope);
166        drop(slot);
167
168        // Advance head
169        self.head.store(head.wrapping_add(1), Ordering::Release);
170
171        // Update stats
172        self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
173        self.update_max_depth();
174
175        Ok(())
176    }
177
178    fn try_dequeue(&self) -> Result<MessageEnvelope> {
179        let tail = self.tail.load(Ordering::Acquire);
180        let head = self.head.load(Ordering::Acquire);
181
182        // Check if empty
183        if head == tail {
184            return Err(RingKernelError::QueueEmpty);
185        }
186
187        // Get slot
188        let index = (tail as usize) & self.mask;
189        let mut slot = self.buffer[index].lock();
190        let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
191        drop(slot);
192
193        // Advance tail
194        self.tail.store(tail.wrapping_add(1), Ordering::Release);
195
196        // Update stats
197        self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
198
199        Ok(envelope)
200    }
201
202    fn stats(&self) -> QueueStats {
203        QueueStats {
204            enqueued: self.stats.enqueued.load(Ordering::Relaxed),
205            dequeued: self.stats.dequeued.load(Ordering::Relaxed),
206            dropped: self.stats.dropped.load(Ordering::Relaxed),
207            depth: self.depth(),
208            max_depth: self.stats.max_depth.load(Ordering::Relaxed),
209        }
210    }
211
212    fn reset_stats(&self) {
213        self.stats.enqueued.store(0, Ordering::Relaxed);
214        self.stats.dequeued.store(0, Ordering::Relaxed);
215        self.stats.dropped.store(0, Ordering::Relaxed);
216        self.stats.max_depth.store(0, Ordering::Relaxed);
217    }
218}
219
220/// Multi-producer single-consumer lock-free queue.
221///
222/// This variant allows multiple producers (e.g., multiple host threads
223/// or kernel-to-kernel messaging) to enqueue messages concurrently.
224pub struct MpscQueue {
225    /// Inner SPSC queue.
226    inner: SpscQueue,
227    /// Lock for producers.
228    producer_lock: parking_lot::Mutex<()>,
229}
230
231impl MpscQueue {
232    /// Create a new MPSC queue.
233    pub fn new(capacity: usize) -> Self {
234        Self {
235            inner: SpscQueue::new(capacity),
236            producer_lock: parking_lot::Mutex::new(()),
237        }
238    }
239}
240
241impl MessageQueue for MpscQueue {
242    fn capacity(&self) -> usize {
243        self.inner.capacity()
244    }
245
246    fn len(&self) -> usize {
247        self.inner.len()
248    }
249
250    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
251        let _guard = self.producer_lock.lock();
252        self.inner.try_enqueue(envelope)
253    }
254
255    fn try_dequeue(&self) -> Result<MessageEnvelope> {
256        self.inner.try_dequeue()
257    }
258
259    fn stats(&self) -> QueueStats {
260        self.inner.stats()
261    }
262
263    fn reset_stats(&self) {
264        self.inner.reset_stats()
265    }
266}
267
268/// Bounded queue with blocking operations.
269pub struct BoundedQueue {
270    /// Inner MPSC queue.
271    inner: MpscQueue,
272    /// Condvar for waiting on space.
273    not_full: parking_lot::Condvar,
274    /// Condvar for waiting on data.
275    not_empty: parking_lot::Condvar,
276    /// Mutex for condvar coordination.
277    mutex: parking_lot::Mutex<()>,
278}
279
280impl BoundedQueue {
281    /// Create a new bounded queue.
282    pub fn new(capacity: usize) -> Self {
283        Self {
284            inner: MpscQueue::new(capacity),
285            not_full: parking_lot::Condvar::new(),
286            not_empty: parking_lot::Condvar::new(),
287            mutex: parking_lot::Mutex::new(()),
288        }
289    }
290
291    /// Blocking enqueue with timeout.
292    pub fn enqueue_timeout(
293        &self,
294        envelope: MessageEnvelope,
295        timeout: std::time::Duration,
296    ) -> Result<()> {
297        let deadline = std::time::Instant::now() + timeout;
298
299        loop {
300            match self.inner.try_enqueue(envelope.clone()) {
301                Ok(()) => {
302                    self.not_empty.notify_one();
303                    return Ok(());
304                }
305                Err(RingKernelError::QueueFull { .. }) => {
306                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
307                    if remaining.is_zero() {
308                        return Err(RingKernelError::Timeout(timeout));
309                    }
310                    let mut guard = self.mutex.lock();
311                    let _ = self.not_full.wait_for(&mut guard, remaining);
312                }
313                Err(e) => return Err(e),
314            }
315        }
316    }
317
318    /// Blocking dequeue with timeout.
319    pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
320        let deadline = std::time::Instant::now() + timeout;
321
322        loop {
323            match self.inner.try_dequeue() {
324                Ok(envelope) => {
325                    self.not_full.notify_one();
326                    return Ok(envelope);
327                }
328                Err(RingKernelError::QueueEmpty) => {
329                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
330                    if remaining.is_zero() {
331                        return Err(RingKernelError::Timeout(timeout));
332                    }
333                    let mut guard = self.mutex.lock();
334                    let _ = self.not_empty.wait_for(&mut guard, remaining);
335                }
336                Err(e) => return Err(e),
337            }
338        }
339    }
340}
341
342impl MessageQueue for BoundedQueue {
343    fn capacity(&self) -> usize {
344        self.inner.capacity()
345    }
346
347    fn len(&self) -> usize {
348        self.inner.len()
349    }
350
351    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
352        let result = self.inner.try_enqueue(envelope);
353        if result.is_ok() {
354            self.not_empty.notify_one();
355        }
356        result
357    }
358
359    fn try_dequeue(&self) -> Result<MessageEnvelope> {
360        let result = self.inner.try_dequeue();
361        if result.is_ok() {
362            self.not_full.notify_one();
363        }
364        result
365    }
366
367    fn stats(&self) -> QueueStats {
368        self.inner.stats()
369    }
370
371    fn reset_stats(&self) {
372        self.inner.reset_stats()
373    }
374}
375
376// ============================================================================
377// Queue Tiering Support
378// ============================================================================
379
380/// Queue capacity tiers for dynamic queue allocation.
381///
382/// Instead of dynamic resizing (which is complex for GPU memory),
383/// we provide predefined tiers that can be selected based on expected load.
384#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
385pub enum QueueTier {
386    /// Small queues (256 messages) - low traffic, minimal memory.
387    Small,
388    /// Medium queues (1024 messages) - moderate traffic.
389    #[default]
390    Medium,
391    /// Large queues (4096 messages) - high traffic.
392    Large,
393    /// Extra large queues (16384 messages) - very high traffic.
394    ExtraLarge,
395}
396
397impl QueueTier {
398    /// Get the capacity for this tier.
399    pub fn capacity(&self) -> usize {
400        match self {
401            Self::Small => 256,
402            Self::Medium => 1024,
403            Self::Large => 4096,
404            Self::ExtraLarge => 16384,
405        }
406    }
407
408    /// Suggest a tier based on expected message rate.
409    ///
410    /// # Arguments
411    ///
412    /// * `messages_per_second` - Expected message throughput
413    /// * `target_headroom_ms` - Desired buffer time in milliseconds
414    ///
415    /// # Returns
416    ///
417    /// Recommended tier based on traffic patterns.
418    pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
419        let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
420
421        if needed_capacity <= 256 {
422            Self::Small
423        } else if needed_capacity <= 1024 {
424            Self::Medium
425        } else if needed_capacity <= 4096 {
426            Self::Large
427        } else {
428            Self::ExtraLarge
429        }
430    }
431
432    /// Get the next tier up (for capacity planning).
433    pub fn upgrade(&self) -> Self {
434        match self {
435            Self::Small => Self::Medium,
436            Self::Medium => Self::Large,
437            Self::Large => Self::ExtraLarge,
438            Self::ExtraLarge => Self::ExtraLarge, // Already at max
439        }
440    }
441
442    /// Get the tier below (for memory optimization).
443    pub fn downgrade(&self) -> Self {
444        match self {
445            Self::Small => Self::Small, // Already at min
446            Self::Medium => Self::Small,
447            Self::Large => Self::Medium,
448            Self::ExtraLarge => Self::Large,
449        }
450    }
451}
452
453/// Factory for creating appropriately-sized message queues.
454///
455/// # Example
456///
457/// ```ignore
458/// use ringkernel_core::queue::{QueueFactory, QueueTier};
459///
460/// // Create a medium-sized MPSC queue
461/// let queue = QueueFactory::create_mpsc(QueueTier::Medium);
462///
463/// // Create based on expected throughput
464/// let queue = QueueFactory::create_for_throughput(10000, 100); // 10k msg/s, 100ms buffer
465/// ```
466pub struct QueueFactory;
467
468impl QueueFactory {
469    /// Create an SPSC queue with the specified tier.
470    pub fn create_spsc(tier: QueueTier) -> SpscQueue {
471        SpscQueue::new(tier.capacity())
472    }
473
474    /// Create an MPSC queue with the specified tier.
475    pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
476        MpscQueue::new(tier.capacity())
477    }
478
479    /// Create a bounded queue with the specified tier.
480    pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
481        BoundedQueue::new(tier.capacity())
482    }
483
484    /// Create a queue based on expected throughput.
485    ///
486    /// # Arguments
487    ///
488    /// * `messages_per_second` - Expected message throughput
489    /// * `headroom_ms` - Desired buffer time in milliseconds
490    pub fn create_for_throughput(
491        messages_per_second: u64,
492        headroom_ms: u64,
493    ) -> Box<dyn MessageQueue> {
494        let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
495        Box::new(Self::create_mpsc(tier))
496    }
497}
498
499/// Queue health status from monitoring.
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum QueueHealth {
502    /// Queue utilization is healthy (< warning threshold).
503    Healthy,
504    /// Queue is approaching capacity (>= warning, < critical threshold).
505    Warning,
506    /// Queue is near capacity (>= critical threshold).
507    Critical,
508}
509
510/// Monitor for queue health and utilization.
511///
512/// Provides real-time health checking for message queues without
513/// dynamic resizing, allowing applications to take action when
514/// queues approach capacity.
515///
516/// # Example
517///
518/// ```ignore
519/// use ringkernel_core::queue::{QueueMonitor, SpscQueue, QueueHealth};
520///
521/// let queue = SpscQueue::new(1024);
522/// let monitor = QueueMonitor::default();
523///
524/// // Check health periodically
525/// match monitor.check(&queue) {
526///     QueueHealth::Healthy => { /* normal operation */ }
527///     QueueHealth::Warning => { /* consider throttling producers */ }
528///     QueueHealth::Critical => { /* alert! take immediate action */ }
529/// }
530/// ```
531#[derive(Debug, Clone)]
532pub struct QueueMonitor {
533    /// Utilization threshold for warning (0.0 - 1.0).
534    pub warning_threshold: f64,
535    /// Utilization threshold for critical (0.0 - 1.0).
536    pub critical_threshold: f64,
537}
538
539impl Default for QueueMonitor {
540    fn default() -> Self {
541        Self {
542            warning_threshold: 0.75,  // 75%
543            critical_threshold: 0.90, // 90%
544        }
545    }
546}
547
548impl QueueMonitor {
549    /// Create a new queue monitor with custom thresholds.
550    pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
551        Self {
552            warning_threshold,
553            critical_threshold,
554        }
555    }
556
557    /// Check the health of a queue.
558    pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
559        let utilization = self.utilization(queue);
560
561        if utilization >= self.critical_threshold {
562            QueueHealth::Critical
563        } else if utilization >= self.warning_threshold {
564            QueueHealth::Warning
565        } else {
566            QueueHealth::Healthy
567        }
568    }
569
570    /// Get the current utilization (0.0 - 1.0).
571    pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
572        let capacity = queue.capacity();
573        if capacity == 0 {
574            return 0.0;
575        }
576        queue.len() as f64 / capacity as f64
577    }
578
579    /// Get current utilization percentage.
580    pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
581        self.utilization(queue) * 100.0
582    }
583
584    /// Suggest whether to upgrade the queue tier based on observed utilization.
585    ///
586    /// Returns `Some(QueueTier)` if upgrade is recommended, `None` otherwise.
587    pub fn suggest_upgrade(
588        &self,
589        queue: &dyn MessageQueue,
590        current_tier: QueueTier,
591    ) -> Option<QueueTier> {
592        let stats = queue.stats();
593        let utilization = self.utilization(queue);
594
595        // Upgrade if:
596        // - Current utilization is at warning level
597        // - Max observed depth is above critical threshold
598        let max_util = if queue.capacity() > 0 {
599            stats.max_depth as f64 / queue.capacity() as f64
600        } else {
601            0.0
602        };
603
604        if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
605            let upgraded = current_tier.upgrade();
606            if upgraded != current_tier {
607                return Some(upgraded);
608            }
609        }
610
611        None
612    }
613
614    /// Check if queue has experienced drops.
615    pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
616        queue.stats().dropped > 0
617    }
618
619    /// Get the drop rate (drops / total attempted enqueues).
620    pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
621        let stats = queue.stats();
622        let total_attempted = stats.enqueued + stats.dropped;
623        if total_attempted == 0 {
624            return 0.0;
625        }
626        stats.dropped as f64 / total_attempted as f64
627    }
628}
629
630/// Comprehensive queue metrics snapshot.
631#[derive(Debug, Clone)]
632pub struct QueueMetrics {
633    /// Queue health status.
634    pub health: QueueHealth,
635    /// Current utilization (0.0 - 1.0).
636    pub utilization: f64,
637    /// Queue statistics.
638    pub stats: QueueStats,
639    /// Current tier (if known).
640    pub tier: Option<QueueTier>,
641    /// Suggested tier upgrade (if recommended).
642    pub suggested_upgrade: Option<QueueTier>,
643}
644
645impl QueueMetrics {
646    /// Capture metrics from a queue.
647    pub fn capture(
648        queue: &dyn MessageQueue,
649        monitor: &QueueMonitor,
650        current_tier: Option<QueueTier>,
651    ) -> Self {
652        let health = monitor.check(queue);
653        let utilization = monitor.utilization(queue);
654        let stats = queue.stats();
655        let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
656
657        Self {
658            health,
659            utilization,
660            stats,
661            tier: current_tier,
662            suggested_upgrade,
663        }
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use crate::hlc::HlcTimestamp;
671    use crate::message::MessageHeader;
672
673    fn make_envelope() -> MessageEnvelope {
674        MessageEnvelope {
675            header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
676            payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
677        }
678    }
679
680    #[test]
681    fn test_spsc_basic() {
682        let queue = SpscQueue::new(16);
683
684        assert!(queue.is_empty());
685        assert!(!queue.is_full());
686
687        let env = make_envelope();
688        queue.try_enqueue(env).unwrap();
689
690        assert_eq!(queue.len(), 1);
691        assert!(!queue.is_empty());
692
693        let _ = queue.try_dequeue().unwrap();
694        assert!(queue.is_empty());
695    }
696
697    #[test]
698    fn test_spsc_full() {
699        let queue = SpscQueue::new(4);
700
701        for _ in 0..4 {
702            queue.try_enqueue(make_envelope()).unwrap();
703        }
704
705        assert!(queue.is_full());
706        assert!(matches!(
707            queue.try_enqueue(make_envelope()),
708            Err(RingKernelError::QueueFull { .. })
709        ));
710    }
711
712    #[test]
713    fn test_spsc_stats() {
714        let queue = SpscQueue::new(16);
715
716        for _ in 0..10 {
717            queue.try_enqueue(make_envelope()).unwrap();
718        }
719
720        for _ in 0..5 {
721            let _ = queue.try_dequeue().unwrap();
722        }
723
724        let stats = queue.stats();
725        assert_eq!(stats.enqueued, 10);
726        assert_eq!(stats.dequeued, 5);
727        assert_eq!(stats.depth, 5);
728    }
729
730    #[test]
731    fn test_mpsc_concurrent() {
732        use std::sync::Arc;
733        use std::thread;
734
735        let queue = Arc::new(MpscQueue::new(1024));
736        let mut handles = vec![];
737
738        // Spawn multiple producers
739        for _ in 0..4 {
740            let q = Arc::clone(&queue);
741            handles.push(thread::spawn(move || {
742                for _ in 0..100 {
743                    q.try_enqueue(make_envelope()).unwrap();
744                }
745            }));
746        }
747
748        // Wait for producers
749        for h in handles {
750            h.join().unwrap();
751        }
752
753        let stats = queue.stats();
754        assert_eq!(stats.enqueued, 400);
755    }
756
757    #[test]
758    fn test_bounded_timeout() {
759        let queue = BoundedQueue::new(2);
760
761        // Fill queue
762        queue.try_enqueue(make_envelope()).unwrap();
763        queue.try_enqueue(make_envelope()).unwrap();
764
765        // Should timeout
766        let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
767        assert!(matches!(result, Err(RingKernelError::Timeout(_))));
768    }
769
770    // ========================================================================
771    // Queue Tiering Tests
772    // ========================================================================
773
774    #[test]
775    fn test_queue_tier_capacities() {
776        assert_eq!(QueueTier::Small.capacity(), 256);
777        assert_eq!(QueueTier::Medium.capacity(), 1024);
778        assert_eq!(QueueTier::Large.capacity(), 4096);
779        assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
780    }
781
782    #[test]
783    fn test_queue_tier_for_throughput() {
784        // Low traffic - 1000 msg/s with 100ms buffer = 100 msgs needed
785        assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
786
787        // Medium traffic - 5000 msg/s with 100ms buffer = 500 msgs needed
788        assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
789
790        // High traffic - 20000 msg/s with 100ms buffer = 2000 msgs needed
791        assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
792
793        // Very high traffic - 100000 msg/s with 100ms buffer = 10000 msgs needed
794        assert_eq!(
795            QueueTier::for_throughput(100000, 100),
796            QueueTier::ExtraLarge
797        );
798    }
799
800    #[test]
801    fn test_queue_tier_upgrade_downgrade() {
802        assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
803        assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
804        assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
805        assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); // Max
806
807        assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); // Min
808        assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
809        assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
810        assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
811    }
812
813    #[test]
814    fn test_queue_factory_creates_correct_capacity() {
815        let spsc = QueueFactory::create_spsc(QueueTier::Medium);
816        assert_eq!(spsc.capacity(), 1024);
817
818        let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
819        assert_eq!(mpsc.capacity(), 4096);
820
821        let bounded = QueueFactory::create_bounded(QueueTier::Small);
822        assert_eq!(bounded.capacity(), 256);
823    }
824
825    #[test]
826    fn test_queue_factory_throughput_based() {
827        let queue = QueueFactory::create_for_throughput(10000, 100);
828        // 10000 msg/s * 100ms = 1000 msgs, needs Medium tier = 1024
829        assert_eq!(queue.capacity(), 1024);
830    }
831
832    #[test]
833    fn test_queue_monitor_health_levels() {
834        let monitor = QueueMonitor::default();
835        let queue = SpscQueue::new(100); // Will round to 128
836
837        // Empty - healthy
838        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
839
840        // Fill to 60% - still healthy
841        for _ in 0..76 {
842            queue.try_enqueue(make_envelope()).unwrap();
843        }
844        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
845
846        // Fill to 80% - warning
847        for _ in 0..26 {
848            queue.try_enqueue(make_envelope()).unwrap();
849        }
850        assert_eq!(monitor.check(&queue), QueueHealth::Warning);
851
852        // Fill to 95% - critical
853        for _ in 0..18 {
854            queue.try_enqueue(make_envelope()).unwrap();
855        }
856        assert_eq!(monitor.check(&queue), QueueHealth::Critical);
857    }
858
859    #[test]
860    fn test_queue_monitor_utilization() {
861        let monitor = QueueMonitor::default();
862        let queue = SpscQueue::new(100); // Will round to 128
863
864        assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
865
866        for _ in 0..64 {
867            queue.try_enqueue(make_envelope()).unwrap();
868        }
869        assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
870    }
871
872    #[test]
873    fn test_queue_monitor_drop_detection() {
874        let monitor = QueueMonitor::default();
875        let queue = SpscQueue::new(4);
876
877        // Fill queue completely
878        for _ in 0..4 {
879            queue.try_enqueue(make_envelope()).unwrap();
880        }
881        assert!(!monitor.has_drops(&queue));
882
883        // Attempt another enqueue (should fail and record drop)
884        let _ = queue.try_enqueue(make_envelope());
885        assert!(monitor.has_drops(&queue));
886        assert!(monitor.drop_rate(&queue) > 0.0);
887    }
888
889    #[test]
890    fn test_queue_monitor_upgrade_suggestion() {
891        let monitor = QueueMonitor::default();
892        let queue = SpscQueue::new(QueueTier::Small.capacity());
893
894        // Empty queue - no upgrade needed
895        assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
896
897        // Fill to warning level (>= 75%)
898        for _ in 0..200 {
899            queue.try_enqueue(make_envelope()).unwrap();
900        }
901
902        // Should suggest upgrade
903        let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
904        assert_eq!(suggestion, Some(QueueTier::Medium));
905
906        // Already at max tier - no upgrade possible
907        let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
908        for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
909            large_queue.try_enqueue(make_envelope()).unwrap();
910        }
911        assert!(monitor
912            .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
913            .is_none());
914    }
915
916    #[test]
917    fn test_queue_metrics_capture() {
918        let queue = SpscQueue::new(QueueTier::Medium.capacity());
919        let monitor = QueueMonitor::default();
920
921        // Add some messages
922        for _ in 0..100 {
923            queue.try_enqueue(make_envelope()).unwrap();
924        }
925
926        let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
927
928        assert_eq!(metrics.health, QueueHealth::Healthy);
929        assert!(metrics.utilization < 0.15);
930        assert_eq!(metrics.stats.enqueued, 100);
931        assert_eq!(metrics.tier, Some(QueueTier::Medium));
932        assert!(metrics.suggested_upgrade.is_none());
933    }
934}