Skip to main content

ringkernel_core/
queue.rs

1//! Lock-free message queue implementation.
2//!
3//! This module provides the core message queue abstraction used for
4//! communication between host and GPU kernels. The queue uses a ring
5//! buffer design with atomic operations for lock-free access.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12/// Statistics for a message queue.
13#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15    /// Total messages enqueued.
16    pub enqueued: u64,
17    /// Total messages dequeued.
18    pub dequeued: u64,
19    /// Messages dropped due to full queue.
20    pub dropped: u64,
21    /// Current queue depth.
22    pub depth: u64,
23    /// Maximum queue depth observed.
24    pub max_depth: u64,
25}
26
27/// Trait for message queue implementations.
28///
29/// Message queues provide lock-free FIFO communication between
30/// producers (host or other kernels) and consumers (GPU kernels).
31pub trait MessageQueue: Send + Sync {
32    /// Get the queue capacity.
33    fn capacity(&self) -> usize;
34
35    /// Get current queue size.
36    fn len(&self) -> usize;
37
38    /// Check if queue is empty.
39    fn is_empty(&self) -> bool {
40        self.len() == 0
41    }
42
43    /// Check if queue is full.
44    fn is_full(&self) -> bool {
45        self.len() >= self.capacity()
46    }
47
48    /// Try to enqueue a message envelope.
49    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51    /// Try to dequeue a message envelope.
52    fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54    /// Get queue statistics.
55    fn stats(&self) -> QueueStats;
56
57    /// Reset queue statistics.
58    fn reset_stats(&self);
59}
60
61/// Single-producer single-consumer lock-free ring buffer.
62///
63/// This implementation is optimized for the common case of one
64/// producer (host) and one consumer (GPU kernel).
65pub struct SpscQueue {
66    /// Ring buffer storage.
67    buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68    /// Capacity (power of 2).
69    capacity: usize,
70    /// Mask for index wrapping.
71    mask: usize,
72    /// Head pointer (producer writes here).
73    head: AtomicU64,
74    /// Tail pointer (consumer reads from here).
75    tail: AtomicU64,
76    /// Statistics.
77    stats: QueueStatsInner,
78}
79
80/// Internal statistics with atomics.
81struct QueueStatsInner {
82    enqueued: AtomicU64,
83    dequeued: AtomicU64,
84    dropped: AtomicU64,
85    max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89    /// Create a new queue with the given capacity.
90    ///
91    /// Capacity will be rounded up to the next power of 2.
92    pub fn new(capacity: usize) -> Self {
93        let capacity = capacity.next_power_of_two();
94        let mask = capacity - 1;
95
96        let mut buffer = Vec::with_capacity(capacity);
97        for _ in 0..capacity {
98            buffer.push(parking_lot::Mutex::new(None));
99        }
100
101        Self {
102            buffer,
103            capacity,
104            mask,
105            head: AtomicU64::new(0),
106            tail: AtomicU64::new(0),
107            stats: QueueStatsInner {
108                enqueued: AtomicU64::new(0),
109                dequeued: AtomicU64::new(0),
110                dropped: AtomicU64::new(0),
111                max_depth: AtomicU64::new(0),
112            },
113        }
114    }
115
116    /// Get current depth.
117    fn depth(&self) -> u64 {
118        let head = self.head.load(Ordering::Acquire);
119        let tail = self.tail.load(Ordering::Acquire);
120        head.wrapping_sub(tail)
121    }
122
123    /// Update max depth statistic.
124    fn update_max_depth(&self) {
125        let depth = self.depth();
126        let mut max = self.stats.max_depth.load(Ordering::Relaxed);
127        while depth > max {
128            match self.stats.max_depth.compare_exchange_weak(
129                max,
130                depth,
131                Ordering::Relaxed,
132                Ordering::Relaxed,
133            ) {
134                Ok(_) => break,
135                Err(current) => max = current,
136            }
137        }
138    }
139}
140
141impl MessageQueue for SpscQueue {
142    fn capacity(&self) -> usize {
143        self.capacity
144    }
145
146    fn len(&self) -> usize {
147        self.depth() as usize
148    }
149
150    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
151        let head = self.head.load(Ordering::Acquire);
152        let tail = self.tail.load(Ordering::Acquire);
153
154        // Check if full
155        if head.wrapping_sub(tail) >= self.capacity as u64 {
156            self.stats.dropped.fetch_add(1, Ordering::Relaxed);
157            return Err(RingKernelError::QueueFull {
158                capacity: self.capacity,
159            });
160        }
161
162        // Get slot
163        let index = (head as usize) & self.mask;
164        let mut slot = self.buffer[index].lock();
165        *slot = Some(envelope);
166        drop(slot);
167
168        // Advance head
169        self.head.store(head.wrapping_add(1), Ordering::Release);
170
171        // Update stats
172        self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
173        self.update_max_depth();
174
175        Ok(())
176    }
177
178    fn try_dequeue(&self) -> Result<MessageEnvelope> {
179        let tail = self.tail.load(Ordering::Acquire);
180        let head = self.head.load(Ordering::Acquire);
181
182        // Check if empty
183        if head == tail {
184            return Err(RingKernelError::QueueEmpty);
185        }
186
187        // Get slot
188        let index = (tail as usize) & self.mask;
189        let mut slot = self.buffer[index].lock();
190        let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
191        drop(slot);
192
193        // Advance tail
194        self.tail.store(tail.wrapping_add(1), Ordering::Release);
195
196        // Update stats
197        self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
198
199        Ok(envelope)
200    }
201
202    fn stats(&self) -> QueueStats {
203        QueueStats {
204            enqueued: self.stats.enqueued.load(Ordering::Relaxed),
205            dequeued: self.stats.dequeued.load(Ordering::Relaxed),
206            dropped: self.stats.dropped.load(Ordering::Relaxed),
207            depth: self.depth(),
208            max_depth: self.stats.max_depth.load(Ordering::Relaxed),
209        }
210    }
211
212    fn reset_stats(&self) {
213        self.stats.enqueued.store(0, Ordering::Relaxed);
214        self.stats.dequeued.store(0, Ordering::Relaxed);
215        self.stats.dropped.store(0, Ordering::Relaxed);
216        self.stats.max_depth.store(0, Ordering::Relaxed);
217    }
218}
219
220/// Multi-producer single-consumer lock-free queue.
221///
222/// This variant allows multiple producers (e.g., multiple host threads
223/// or kernel-to-kernel messaging) to enqueue messages concurrently.
224pub struct MpscQueue {
225    /// Inner SPSC queue.
226    inner: SpscQueue,
227    /// Lock for producers.
228    producer_lock: parking_lot::Mutex<()>,
229}
230
231impl MpscQueue {
232    /// Create a new MPSC queue.
233    pub fn new(capacity: usize) -> Self {
234        Self {
235            inner: SpscQueue::new(capacity),
236            producer_lock: parking_lot::Mutex::new(()),
237        }
238    }
239}
240
241impl MessageQueue for MpscQueue {
242    fn capacity(&self) -> usize {
243        self.inner.capacity()
244    }
245
246    fn len(&self) -> usize {
247        self.inner.len()
248    }
249
250    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
251        let _guard = self.producer_lock.lock();
252        self.inner.try_enqueue(envelope)
253    }
254
255    fn try_dequeue(&self) -> Result<MessageEnvelope> {
256        self.inner.try_dequeue()
257    }
258
259    fn stats(&self) -> QueueStats {
260        self.inner.stats()
261    }
262
263    fn reset_stats(&self) {
264        self.inner.reset_stats()
265    }
266}
267
268/// Bounded queue with blocking operations.
269pub struct BoundedQueue {
270    /// Inner MPSC queue.
271    inner: MpscQueue,
272    /// Condvar for waiting on space.
273    not_full: parking_lot::Condvar,
274    /// Condvar for waiting on data.
275    not_empty: parking_lot::Condvar,
276    /// Mutex for condvar coordination.
277    mutex: parking_lot::Mutex<()>,
278}
279
280impl BoundedQueue {
281    /// Create a new bounded queue.
282    pub fn new(capacity: usize) -> Self {
283        Self {
284            inner: MpscQueue::new(capacity),
285            not_full: parking_lot::Condvar::new(),
286            not_empty: parking_lot::Condvar::new(),
287            mutex: parking_lot::Mutex::new(()),
288        }
289    }
290
291    /// Blocking enqueue with timeout.
292    pub fn enqueue_timeout(
293        &self,
294        envelope: MessageEnvelope,
295        timeout: std::time::Duration,
296    ) -> Result<()> {
297        let deadline = std::time::Instant::now() + timeout;
298
299        loop {
300            match self.inner.try_enqueue(envelope.clone()) {
301                Ok(()) => {
302                    self.not_empty.notify_one();
303                    return Ok(());
304                }
305                Err(RingKernelError::QueueFull { .. }) => {
306                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
307                    if remaining.is_zero() {
308                        return Err(RingKernelError::Timeout(timeout));
309                    }
310                    let mut guard = self.mutex.lock();
311                    let _ = self.not_full.wait_for(&mut guard, remaining);
312                }
313                Err(e) => return Err(e),
314            }
315        }
316    }
317
318    /// Blocking dequeue with timeout.
319    pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
320        let deadline = std::time::Instant::now() + timeout;
321
322        loop {
323            match self.inner.try_dequeue() {
324                Ok(envelope) => {
325                    self.not_full.notify_one();
326                    return Ok(envelope);
327                }
328                Err(RingKernelError::QueueEmpty) => {
329                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
330                    if remaining.is_zero() {
331                        return Err(RingKernelError::Timeout(timeout));
332                    }
333                    let mut guard = self.mutex.lock();
334                    let _ = self.not_empty.wait_for(&mut guard, remaining);
335                }
336                Err(e) => return Err(e),
337            }
338        }
339    }
340}
341
342impl MessageQueue for BoundedQueue {
343    fn capacity(&self) -> usize {
344        self.inner.capacity()
345    }
346
347    fn len(&self) -> usize {
348        self.inner.len()
349    }
350
351    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
352        let result = self.inner.try_enqueue(envelope);
353        if result.is_ok() {
354            self.not_empty.notify_one();
355        }
356        result
357    }
358
359    fn try_dequeue(&self) -> Result<MessageEnvelope> {
360        let result = self.inner.try_dequeue();
361        if result.is_ok() {
362            self.not_full.notify_one();
363        }
364        result
365    }
366
367    fn stats(&self) -> QueueStats {
368        self.inner.stats()
369    }
370
371    fn reset_stats(&self) {
372        self.inner.reset_stats()
373    }
374}
375
376// ============================================================================
377// Queue Tiering Support
378// ============================================================================
379
380/// Queue capacity tiers for dynamic queue allocation.
381///
382/// Instead of dynamic resizing (which is complex for GPU memory),
383/// we provide predefined tiers that can be selected based on expected load.
384#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
385pub enum QueueTier {
386    /// Small queues (256 messages) - low traffic, minimal memory.
387    Small,
388    /// Medium queues (1024 messages) - moderate traffic.
389    #[default]
390    Medium,
391    /// Large queues (4096 messages) - high traffic.
392    Large,
393    /// Extra large queues (16384 messages) - very high traffic.
394    ExtraLarge,
395}
396
397impl QueueTier {
398    /// Get the capacity for this tier.
399    pub fn capacity(&self) -> usize {
400        match self {
401            Self::Small => 256,
402            Self::Medium => 1024,
403            Self::Large => 4096,
404            Self::ExtraLarge => 16384,
405        }
406    }
407
408    /// Suggest a tier based on expected message rate.
409    ///
410    /// # Arguments
411    ///
412    /// * `messages_per_second` - Expected message throughput
413    /// * `target_headroom_ms` - Desired buffer time in milliseconds
414    ///
415    /// # Returns
416    ///
417    /// Recommended tier based on traffic patterns.
418    pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
419        let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
420
421        if needed_capacity <= 256 {
422            Self::Small
423        } else if needed_capacity <= 1024 {
424            Self::Medium
425        } else if needed_capacity <= 4096 {
426            Self::Large
427        } else {
428            Self::ExtraLarge
429        }
430    }
431
432    /// Get the next tier up (for capacity planning).
433    pub fn upgrade(&self) -> Self {
434        match self {
435            Self::Small => Self::Medium,
436            Self::Medium => Self::Large,
437            Self::Large => Self::ExtraLarge,
438            Self::ExtraLarge => Self::ExtraLarge, // Already at max
439        }
440    }
441
442    /// Get the tier below (for memory optimization).
443    pub fn downgrade(&self) -> Self {
444        match self {
445            Self::Small => Self::Small, // Already at min
446            Self::Medium => Self::Small,
447            Self::Large => Self::Medium,
448            Self::ExtraLarge => Self::Large,
449        }
450    }
451}
452
453/// Factory for creating appropriately-sized message queues.
454///
455/// # Example
456///
457/// ```ignore
458/// use ringkernel_core::queue::{QueueFactory, QueueTier};
459///
460/// // Create a medium-sized MPSC queue
461/// let queue = QueueFactory::create_mpsc(QueueTier::Medium);
462///
463/// // Create based on expected throughput
464/// let queue = QueueFactory::create_for_throughput(10000, 100); // 10k msg/s, 100ms buffer
465/// ```
466pub struct QueueFactory;
467
468impl QueueFactory {
469    /// Create an SPSC queue with the specified tier.
470    pub fn create_spsc(tier: QueueTier) -> SpscQueue {
471        SpscQueue::new(tier.capacity())
472    }
473
474    /// Create an MPSC queue with the specified tier.
475    pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
476        MpscQueue::new(tier.capacity())
477    }
478
479    /// Create a bounded queue with the specified tier.
480    pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
481        BoundedQueue::new(tier.capacity())
482    }
483
484    /// Create a queue based on expected throughput.
485    ///
486    /// # Arguments
487    ///
488    /// * `messages_per_second` - Expected message throughput
489    /// * `headroom_ms` - Desired buffer time in milliseconds
490    pub fn create_for_throughput(
491        messages_per_second: u64,
492        headroom_ms: u64,
493    ) -> Box<dyn MessageQueue> {
494        let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
495        Box::new(Self::create_mpsc(tier))
496    }
497}
498
499/// Queue health status from monitoring.
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum QueueHealth {
502    /// Queue utilization is healthy (< warning threshold).
503    Healthy,
504    /// Queue is approaching capacity (>= warning, < critical threshold).
505    Warning,
506    /// Queue is near capacity (>= critical threshold).
507    Critical,
508}
509
510/// Monitor for queue health and utilization.
511///
512/// Provides real-time health checking for message queues without
513/// dynamic resizing, allowing applications to take action when
514/// queues approach capacity.
515///
516/// # Example
517///
518/// ```ignore
519/// use ringkernel_core::queue::{QueueMonitor, SpscQueue, QueueHealth};
520///
521/// let queue = SpscQueue::new(1024);
522/// let monitor = QueueMonitor::default();
523///
524/// // Check health periodically
525/// match monitor.check(&queue) {
526///     QueueHealth::Healthy => { /* normal operation */ }
527///     QueueHealth::Warning => { /* consider throttling producers */ }
528///     QueueHealth::Critical => { /* alert! take immediate action */ }
529/// }
530/// ```
531#[derive(Debug, Clone)]
532pub struct QueueMonitor {
533    /// Utilization threshold for warning (0.0 - 1.0).
534    pub warning_threshold: f64,
535    /// Utilization threshold for critical (0.0 - 1.0).
536    pub critical_threshold: f64,
537}
538
539impl Default for QueueMonitor {
540    fn default() -> Self {
541        Self {
542            warning_threshold: 0.75,  // 75%
543            critical_threshold: 0.90, // 90%
544        }
545    }
546}
547
548impl QueueMonitor {
549    /// Create a new queue monitor with custom thresholds.
550    pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
551        Self {
552            warning_threshold,
553            critical_threshold,
554        }
555    }
556
557    /// Check the health of a queue.
558    pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
559        let utilization = self.utilization(queue);
560
561        if utilization >= self.critical_threshold {
562            QueueHealth::Critical
563        } else if utilization >= self.warning_threshold {
564            QueueHealth::Warning
565        } else {
566            QueueHealth::Healthy
567        }
568    }
569
570    /// Get the current utilization (0.0 - 1.0).
571    pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
572        let capacity = queue.capacity();
573        if capacity == 0 {
574            return 0.0;
575        }
576        queue.len() as f64 / capacity as f64
577    }
578
579    /// Get current utilization percentage.
580    pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
581        self.utilization(queue) * 100.0
582    }
583
584    /// Suggest whether to upgrade the queue tier based on observed utilization.
585    ///
586    /// Returns `Some(QueueTier)` if upgrade is recommended, `None` otherwise.
587    pub fn suggest_upgrade(
588        &self,
589        queue: &dyn MessageQueue,
590        current_tier: QueueTier,
591    ) -> Option<QueueTier> {
592        let stats = queue.stats();
593        let utilization = self.utilization(queue);
594
595        // Upgrade if:
596        // - Current utilization is at warning level
597        // - Max observed depth is above critical threshold
598        let max_util = if queue.capacity() > 0 {
599            stats.max_depth as f64 / queue.capacity() as f64
600        } else {
601            0.0
602        };
603
604        if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
605            let upgraded = current_tier.upgrade();
606            if upgraded != current_tier {
607                return Some(upgraded);
608            }
609        }
610
611        None
612    }
613
614    /// Check if queue has experienced drops.
615    pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
616        queue.stats().dropped > 0
617    }
618
619    /// Get the drop rate (drops / total attempted enqueues).
620    pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
621        let stats = queue.stats();
622        let total_attempted = stats.enqueued + stats.dropped;
623        if total_attempted == 0 {
624            return 0.0;
625        }
626        stats.dropped as f64 / total_attempted as f64
627    }
628}
629
630/// Comprehensive queue metrics snapshot.
631#[derive(Debug, Clone)]
632pub struct QueueMetrics {
633    /// Queue health status.
634    pub health: QueueHealth,
635    /// Current utilization (0.0 - 1.0).
636    pub utilization: f64,
637    /// Queue statistics.
638    pub stats: QueueStats,
639    /// Current tier (if known).
640    pub tier: Option<QueueTier>,
641    /// Suggested tier upgrade (if recommended).
642    pub suggested_upgrade: Option<QueueTier>,
643}
644
645impl QueueMetrics {
646    /// Capture metrics from a queue.
647    pub fn capture(
648        queue: &dyn MessageQueue,
649        monitor: &QueueMonitor,
650        current_tier: Option<QueueTier>,
651    ) -> Self {
652        let health = monitor.check(queue);
653        let utilization = monitor.utilization(queue);
654        let stats = queue.stats();
655        let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
656
657        Self {
658            health,
659            utilization,
660            stats,
661            tier: current_tier,
662            suggested_upgrade,
663        }
664    }
665}
666
667// ============================================================================
668// Partitioned Queue
669// ============================================================================
670
671/// A partitioned queue for reduced contention with multiple producers.
672///
673/// Instead of a single queue with a lock, this uses multiple independent
674/// partitions (SPSC queues) to reduce contention when many producers
675/// are sending messages concurrently.
676///
677/// Producers are routed to partitions based on their source ID, ensuring
678/// messages from the same source go to the same partition (preserving order).
679///
680/// # Example
681///
682/// ```ignore
683/// use ringkernel_core::queue::{PartitionedQueue, QueueTier};
684///
685/// // Create 4 partitions with Medium tier capacity each
686/// let queue = PartitionedQueue::new(4, QueueTier::Medium.capacity());
687///
688/// // Enqueue with source-based routing
689/// queue.try_enqueue_from(source_id, envelope)?;
690///
691/// // Dequeue from any partition that has messages
692/// if let Some(envelope) = queue.try_dequeue_any() {
693///     // process message
694/// }
695/// ```
696pub struct PartitionedQueue {
697    /// Individual partition queues.
698    partitions: Vec<SpscQueue>,
699    /// Number of partitions.
700    partition_count: usize,
701    /// Round-robin dequeue index.
702    dequeue_index: AtomicU64,
703}
704
705impl PartitionedQueue {
706    /// Creates a new partitioned queue.
707    ///
708    /// # Arguments
709    ///
710    /// * `partition_count` - Number of partitions (should be power of 2 for efficiency)
711    /// * `capacity_per_partition` - Capacity of each partition
712    pub fn new(partition_count: usize, capacity_per_partition: usize) -> Self {
713        let partition_count = partition_count.max(1).next_power_of_two();
714        let partitions = (0..partition_count)
715            .map(|_| SpscQueue::new(capacity_per_partition))
716            .collect();
717
718        Self {
719            partitions,
720            partition_count,
721            dequeue_index: AtomicU64::new(0),
722        }
723    }
724
725    /// Creates a partitioned queue with default settings.
726    ///
727    /// Uses 4 partitions with Medium tier capacity.
728    pub fn with_defaults() -> Self {
729        Self::new(4, QueueTier::Medium.capacity())
730    }
731
732    /// Creates a partitioned queue sized for high contention.
733    ///
734    /// Uses 8 partitions with Large tier capacity.
735    pub fn for_high_contention() -> Self {
736        Self::new(8, QueueTier::Large.capacity())
737    }
738
739    /// Returns the partition index for a given source ID.
740    #[inline]
741    pub fn partition_for(&self, source_id: u64) -> usize {
742        (source_id as usize) & (self.partition_count - 1)
743    }
744
745    /// Returns the number of partitions.
746    pub fn partition_count(&self) -> usize {
747        self.partition_count
748    }
749
750    /// Returns the capacity per partition.
751    pub fn capacity_per_partition(&self) -> usize {
752        self.partitions.first().map_or(0, |p| p.capacity())
753    }
754
755    /// Total capacity across all partitions.
756    pub fn total_capacity(&self) -> usize {
757        self.capacity_per_partition() * self.partition_count
758    }
759
760    /// Total messages across all partitions.
761    pub fn total_messages(&self) -> usize {
762        self.partitions.iter().map(|p| p.len()).sum()
763    }
764
765    /// Enqueues a message to a partition based on source ID.
766    ///
767    /// Messages from the same source always go to the same partition,
768    /// preserving ordering for that source.
769    pub fn try_enqueue_from(&self, source_id: u64, envelope: MessageEnvelope) -> Result<()> {
770        let partition = self.partition_for(source_id);
771        self.partitions[partition].try_enqueue(envelope)
772    }
773
774    /// Enqueues a message using the envelope's source kernel ID.
775    pub fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
776        let source_id = envelope.header.source_kernel;
777        self.try_enqueue_from(source_id, envelope)
778    }
779
780    /// Tries to dequeue from a specific partition.
781    pub fn try_dequeue_partition(&self, partition: usize) -> Result<MessageEnvelope> {
782        if partition >= self.partition_count {
783            return Err(RingKernelError::InvalidConfig(format!(
784                "Invalid partition index: {} (max: {})",
785                partition,
786                self.partition_count - 1
787            )));
788        }
789        self.partitions[partition].try_dequeue()
790    }
791
792    /// Tries to dequeue from any partition that has messages.
793    ///
794    /// Uses round-robin to fairly distribute dequeues across partitions.
795    pub fn try_dequeue_any(&self) -> Option<MessageEnvelope> {
796        let start_index = self.dequeue_index.fetch_add(1, Ordering::Relaxed) as usize;
797
798        for i in 0..self.partition_count {
799            let partition = (start_index + i) & (self.partition_count - 1);
800            if let Ok(envelope) = self.partitions[partition].try_dequeue() {
801                return Some(envelope);
802            }
803        }
804
805        None
806    }
807
808    /// Returns statistics for a specific partition.
809    pub fn partition_stats(&self, partition: usize) -> Option<QueueStats> {
810        self.partitions.get(partition).map(|p| p.stats())
811    }
812
813    /// Returns aggregated statistics across all partitions.
814    pub fn stats(&self) -> PartitionedQueueStats {
815        let mut total = QueueStats::default();
816        let mut partition_stats = Vec::with_capacity(self.partition_count);
817
818        for partition in &self.partitions {
819            let stats = partition.stats();
820            total.enqueued += stats.enqueued;
821            total.dequeued += stats.dequeued;
822            total.dropped += stats.dropped;
823            total.depth += stats.depth;
824            if stats.max_depth > total.max_depth {
825                total.max_depth = stats.max_depth;
826            }
827            partition_stats.push(stats);
828        }
829
830        PartitionedQueueStats {
831            total,
832            partition_stats,
833            partition_count: self.partition_count,
834        }
835    }
836
837    /// Resets statistics for all partitions.
838    pub fn reset_stats(&self) {
839        for partition in &self.partitions {
840            partition.reset_stats();
841        }
842    }
843}
844
845/// Statistics for a partitioned queue.
846#[derive(Debug, Clone)]
847pub struct PartitionedQueueStats {
848    /// Aggregated statistics.
849    pub total: QueueStats,
850    /// Per-partition statistics.
851    pub partition_stats: Vec<QueueStats>,
852    /// Number of partitions.
853    pub partition_count: usize,
854}
855
856impl PartitionedQueueStats {
857    /// Returns the load imbalance factor (max/avg).
858    ///
859    /// A value of 1.0 indicates perfect balance.
860    /// Higher values indicate imbalance (some partitions have more messages).
861    pub fn load_imbalance(&self) -> f64 {
862        if self.partition_count == 0 {
863            return 1.0;
864        }
865
866        let avg = self.total.depth as f64 / self.partition_count as f64;
867        if avg == 0.0 {
868            return 1.0;
869        }
870
871        let max = self
872            .partition_stats
873            .iter()
874            .map(|s| s.depth)
875            .max()
876            .unwrap_or(0);
877        max as f64 / avg
878    }
879
880    /// Returns the utilization of the most loaded partition.
881    pub fn max_partition_utilization(&self, capacity_per_partition: usize) -> f64 {
882        if capacity_per_partition == 0 {
883            return 0.0;
884        }
885
886        let max = self
887            .partition_stats
888            .iter()
889            .map(|s| s.depth)
890            .max()
891            .unwrap_or(0);
892        max as f64 / capacity_per_partition as f64
893    }
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899    use crate::hlc::HlcTimestamp;
900    use crate::message::MessageHeader;
901
902    fn make_envelope() -> MessageEnvelope {
903        MessageEnvelope {
904            header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
905            payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
906        }
907    }
908
909    #[test]
910    fn test_spsc_basic() {
911        let queue = SpscQueue::new(16);
912
913        assert!(queue.is_empty());
914        assert!(!queue.is_full());
915
916        let env = make_envelope();
917        queue.try_enqueue(env).unwrap();
918
919        assert_eq!(queue.len(), 1);
920        assert!(!queue.is_empty());
921
922        let _ = queue.try_dequeue().unwrap();
923        assert!(queue.is_empty());
924    }
925
926    #[test]
927    fn test_spsc_full() {
928        let queue = SpscQueue::new(4);
929
930        for _ in 0..4 {
931            queue.try_enqueue(make_envelope()).unwrap();
932        }
933
934        assert!(queue.is_full());
935        assert!(matches!(
936            queue.try_enqueue(make_envelope()),
937            Err(RingKernelError::QueueFull { .. })
938        ));
939    }
940
941    #[test]
942    fn test_spsc_stats() {
943        let queue = SpscQueue::new(16);
944
945        for _ in 0..10 {
946            queue.try_enqueue(make_envelope()).unwrap();
947        }
948
949        for _ in 0..5 {
950            let _ = queue.try_dequeue().unwrap();
951        }
952
953        let stats = queue.stats();
954        assert_eq!(stats.enqueued, 10);
955        assert_eq!(stats.dequeued, 5);
956        assert_eq!(stats.depth, 5);
957    }
958
959    #[test]
960    fn test_mpsc_concurrent() {
961        use std::sync::Arc;
962        use std::thread;
963
964        let queue = Arc::new(MpscQueue::new(1024));
965        let mut handles = vec![];
966
967        // Spawn multiple producers
968        for _ in 0..4 {
969            let q = Arc::clone(&queue);
970            handles.push(thread::spawn(move || {
971                for _ in 0..100 {
972                    q.try_enqueue(make_envelope()).unwrap();
973                }
974            }));
975        }
976
977        // Wait for producers
978        for h in handles {
979            h.join().unwrap();
980        }
981
982        let stats = queue.stats();
983        assert_eq!(stats.enqueued, 400);
984    }
985
986    #[test]
987    fn test_bounded_timeout() {
988        let queue = BoundedQueue::new(2);
989
990        // Fill queue
991        queue.try_enqueue(make_envelope()).unwrap();
992        queue.try_enqueue(make_envelope()).unwrap();
993
994        // Should timeout
995        let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
996        assert!(matches!(result, Err(RingKernelError::Timeout(_))));
997    }
998
999    // ========================================================================
1000    // Queue Tiering Tests
1001    // ========================================================================
1002
1003    #[test]
1004    fn test_queue_tier_capacities() {
1005        assert_eq!(QueueTier::Small.capacity(), 256);
1006        assert_eq!(QueueTier::Medium.capacity(), 1024);
1007        assert_eq!(QueueTier::Large.capacity(), 4096);
1008        assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
1009    }
1010
1011    #[test]
1012    fn test_queue_tier_for_throughput() {
1013        // Low traffic - 1000 msg/s with 100ms buffer = 100 msgs needed
1014        assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
1015
1016        // Medium traffic - 5000 msg/s with 100ms buffer = 500 msgs needed
1017        assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
1018
1019        // High traffic - 20000 msg/s with 100ms buffer = 2000 msgs needed
1020        assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
1021
1022        // Very high traffic - 100000 msg/s with 100ms buffer = 10000 msgs needed
1023        assert_eq!(
1024            QueueTier::for_throughput(100000, 100),
1025            QueueTier::ExtraLarge
1026        );
1027    }
1028
1029    #[test]
1030    fn test_queue_tier_upgrade_downgrade() {
1031        assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
1032        assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
1033        assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
1034        assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); // Max
1035
1036        assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); // Min
1037        assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
1038        assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
1039        assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
1040    }
1041
1042    #[test]
1043    fn test_queue_factory_creates_correct_capacity() {
1044        let spsc = QueueFactory::create_spsc(QueueTier::Medium);
1045        assert_eq!(spsc.capacity(), 1024);
1046
1047        let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
1048        assert_eq!(mpsc.capacity(), 4096);
1049
1050        let bounded = QueueFactory::create_bounded(QueueTier::Small);
1051        assert_eq!(bounded.capacity(), 256);
1052    }
1053
1054    #[test]
1055    fn test_queue_factory_throughput_based() {
1056        let queue = QueueFactory::create_for_throughput(10000, 100);
1057        // 10000 msg/s * 100ms = 1000 msgs, needs Medium tier = 1024
1058        assert_eq!(queue.capacity(), 1024);
1059    }
1060
1061    #[test]
1062    fn test_queue_monitor_health_levels() {
1063        let monitor = QueueMonitor::default();
1064        let queue = SpscQueue::new(100); // Will round to 128
1065
1066        // Empty - healthy
1067        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1068
1069        // Fill to 60% - still healthy
1070        for _ in 0..76 {
1071            queue.try_enqueue(make_envelope()).unwrap();
1072        }
1073        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1074
1075        // Fill to 80% - warning
1076        for _ in 0..26 {
1077            queue.try_enqueue(make_envelope()).unwrap();
1078        }
1079        assert_eq!(monitor.check(&queue), QueueHealth::Warning);
1080
1081        // Fill to 95% - critical
1082        for _ in 0..18 {
1083            queue.try_enqueue(make_envelope()).unwrap();
1084        }
1085        assert_eq!(monitor.check(&queue), QueueHealth::Critical);
1086    }
1087
1088    #[test]
1089    fn test_queue_monitor_utilization() {
1090        let monitor = QueueMonitor::default();
1091        let queue = SpscQueue::new(100); // Will round to 128
1092
1093        assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
1094
1095        for _ in 0..64 {
1096            queue.try_enqueue(make_envelope()).unwrap();
1097        }
1098        assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
1099    }
1100
1101    #[test]
1102    fn test_queue_monitor_drop_detection() {
1103        let monitor = QueueMonitor::default();
1104        let queue = SpscQueue::new(4);
1105
1106        // Fill queue completely
1107        for _ in 0..4 {
1108            queue.try_enqueue(make_envelope()).unwrap();
1109        }
1110        assert!(!monitor.has_drops(&queue));
1111
1112        // Attempt another enqueue (should fail and record drop)
1113        let _ = queue.try_enqueue(make_envelope());
1114        assert!(monitor.has_drops(&queue));
1115        assert!(monitor.drop_rate(&queue) > 0.0);
1116    }
1117
1118    #[test]
1119    fn test_queue_monitor_upgrade_suggestion() {
1120        let monitor = QueueMonitor::default();
1121        let queue = SpscQueue::new(QueueTier::Small.capacity());
1122
1123        // Empty queue - no upgrade needed
1124        assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
1125
1126        // Fill to warning level (>= 75%)
1127        for _ in 0..200 {
1128            queue.try_enqueue(make_envelope()).unwrap();
1129        }
1130
1131        // Should suggest upgrade
1132        let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
1133        assert_eq!(suggestion, Some(QueueTier::Medium));
1134
1135        // Already at max tier - no upgrade possible
1136        let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
1137        for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
1138            large_queue.try_enqueue(make_envelope()).unwrap();
1139        }
1140        assert!(monitor
1141            .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
1142            .is_none());
1143    }
1144
1145    #[test]
1146    fn test_queue_metrics_capture() {
1147        let queue = SpscQueue::new(QueueTier::Medium.capacity());
1148        let monitor = QueueMonitor::default();
1149
1150        // Add some messages
1151        for _ in 0..100 {
1152            queue.try_enqueue(make_envelope()).unwrap();
1153        }
1154
1155        let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
1156
1157        assert_eq!(metrics.health, QueueHealth::Healthy);
1158        assert!(metrics.utilization < 0.15);
1159        assert_eq!(metrics.stats.enqueued, 100);
1160        assert_eq!(metrics.tier, Some(QueueTier::Medium));
1161        assert!(metrics.suggested_upgrade.is_none());
1162    }
1163
1164    // ========================================================================
1165    // Partitioned Queue Tests
1166    // ========================================================================
1167
1168    #[test]
1169    fn test_partitioned_queue_creation() {
1170        let queue = PartitionedQueue::new(4, 256);
1171        assert_eq!(queue.partition_count(), 4);
1172        assert_eq!(queue.capacity_per_partition(), 256);
1173        assert_eq!(queue.total_capacity(), 1024);
1174    }
1175
1176    #[test]
1177    fn test_partitioned_queue_rounds_to_power_of_two() {
1178        let queue = PartitionedQueue::new(3, 256);
1179        assert_eq!(queue.partition_count(), 4); // Rounded up to 4
1180    }
1181
1182    #[test]
1183    fn test_partitioned_queue_routing() {
1184        let queue = PartitionedQueue::with_defaults();
1185
1186        // Same source ID should always go to same partition
1187        let partition1 = queue.partition_for(12345);
1188        let partition2 = queue.partition_for(12345);
1189        assert_eq!(partition1, partition2);
1190
1191        // Different source IDs may go to different partitions
1192        let partition_a = queue.partition_for(0);
1193        let partition_b = queue.partition_for(1);
1194        assert!(partition_a != partition_b || queue.partition_count() == 1);
1195    }
1196
1197    #[test]
1198    fn test_partitioned_queue_enqueue_dequeue() {
1199        let queue = PartitionedQueue::new(4, 64);
1200
1201        // Enqueue from different sources
1202        for source in 0..16u64 {
1203            let mut env = make_envelope();
1204            env.header.source_kernel = source;
1205            queue.try_enqueue(env).unwrap();
1206        }
1207
1208        assert_eq!(queue.total_messages(), 16);
1209
1210        // Dequeue all
1211        for _ in 0..16 {
1212            let env = queue.try_dequeue_any();
1213            assert!(env.is_some());
1214        }
1215
1216        assert_eq!(queue.total_messages(), 0);
1217        assert!(queue.try_dequeue_any().is_none());
1218    }
1219
1220    #[test]
1221    fn test_partitioned_queue_stats() {
1222        let queue = PartitionedQueue::new(4, 64);
1223
1224        // Enqueue to different partitions
1225        for source in 0..20u64 {
1226            let mut env = make_envelope();
1227            env.header.source_kernel = source;
1228            queue.try_enqueue(env).unwrap();
1229        }
1230
1231        let stats = queue.stats();
1232        assert_eq!(stats.total.enqueued, 20);
1233        assert_eq!(stats.partition_count, 4);
1234        assert_eq!(stats.partition_stats.len(), 4);
1235    }
1236
1237    #[test]
1238    fn test_partitioned_queue_load_imbalance() {
1239        let queue = PartitionedQueue::new(4, 64);
1240
1241        // All messages go to same partition (source 0 maps to partition 0)
1242        for _ in 0..10 {
1243            let mut env = make_envelope();
1244            env.header.source_kernel = 0;
1245            queue.try_enqueue(env).unwrap();
1246        }
1247
1248        let stats = queue.stats();
1249        // All 10 messages in one partition, avg = 2.5, max = 10
1250        // Imbalance = 10 / 2.5 = 4.0
1251        assert!((stats.load_imbalance() - 4.0).abs() < 0.001);
1252    }
1253
1254    #[test]
1255    fn test_partitioned_queue_dequeue_partition() {
1256        let queue = PartitionedQueue::new(4, 64);
1257
1258        // Enqueue to a specific partition (source 0)
1259        let mut env = make_envelope();
1260        env.header.source_kernel = 0;
1261        queue.try_enqueue(env).unwrap();
1262
1263        let partition = queue.partition_for(0);
1264
1265        // Dequeue from that specific partition
1266        let result = queue.try_dequeue_partition(partition);
1267        assert!(result.is_ok());
1268
1269        // Invalid partition should error
1270        let result = queue.try_dequeue_partition(100);
1271        assert!(result.is_err());
1272    }
1273}