Skip to main content

ringkernel_core/
queue.rs

1//! Lock-free message queue implementation.
2//!
3//! This module provides the core message queue abstraction used for
4//! communication between host and GPU kernels. The queue uses a ring
5//! buffer design with atomic operations for lock-free access.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12/// Statistics for a message queue.
13#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15    /// Total messages enqueued.
16    pub enqueued: u64,
17    /// Total messages dequeued.
18    pub dequeued: u64,
19    /// Messages dropped due to full queue.
20    pub dropped: u64,
21    /// Current queue depth.
22    pub depth: u64,
23    /// Maximum queue depth observed.
24    pub max_depth: u64,
25}
26
27/// Trait for message queue implementations.
28///
29/// Message queues provide lock-free FIFO communication between
30/// producers (host or other kernels) and consumers (GPU kernels).
31pub trait MessageQueue: Send + Sync {
32    /// Get the queue capacity.
33    fn capacity(&self) -> usize;
34
35    /// Get current queue size.
36    fn len(&self) -> usize;
37
38    /// Check if queue is empty.
39    fn is_empty(&self) -> bool {
40        self.len() == 0
41    }
42
43    /// Check if queue is full.
44    fn is_full(&self) -> bool {
45        self.len() >= self.capacity()
46    }
47
48    /// Try to enqueue a message envelope.
49    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51    /// Try to dequeue a message envelope.
52    fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54    /// Get queue statistics.
55    fn stats(&self) -> QueueStats;
56
57    /// Reset queue statistics.
58    fn reset_stats(&self);
59}
60
61/// Single-producer single-consumer lock-free ring buffer.
62///
63/// This implementation is optimized for the common case of one
64/// producer (host) and one consumer (GPU kernel).
65pub struct SpscQueue {
66    /// Ring buffer storage.
67    buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68    /// Capacity (power of 2).
69    capacity: usize,
70    /// Mask for index wrapping.
71    mask: usize,
72    /// Head pointer (producer writes here).
73    head: AtomicU64,
74    /// Tail pointer (consumer reads from here).
75    tail: AtomicU64,
76    /// Statistics.
77    stats: QueueStatsInner,
78}
79
80/// Internal statistics with atomics.
81struct QueueStatsInner {
82    enqueued: AtomicU64,
83    dequeued: AtomicU64,
84    dropped: AtomicU64,
85    max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89    /// Create a new queue with the given capacity.
90    ///
91    /// Capacity will be rounded up to the next power of 2.
92    pub fn new(capacity: usize) -> Self {
93        let capacity = capacity.next_power_of_two();
94        let mask = capacity - 1;
95
96        let mut buffer = Vec::with_capacity(capacity);
97        for _ in 0..capacity {
98            buffer.push(parking_lot::Mutex::new(None));
99        }
100
101        Self {
102            buffer,
103            capacity,
104            mask,
105            head: AtomicU64::new(0),
106            tail: AtomicU64::new(0),
107            stats: QueueStatsInner {
108                enqueued: AtomicU64::new(0),
109                dequeued: AtomicU64::new(0),
110                dropped: AtomicU64::new(0),
111                max_depth: AtomicU64::new(0),
112            },
113        }
114    }
115
116    /// Get current depth.
117    #[inline]
118    fn depth(&self) -> u64 {
119        let head = self.head.load(Ordering::Acquire);
120        let tail = self.tail.load(Ordering::Acquire);
121        head.wrapping_sub(tail)
122    }
123
124    /// Update max depth statistic.
125    fn update_max_depth(&self) {
126        let depth = self.depth();
127        let mut max = self.stats.max_depth.load(Ordering::Relaxed);
128        while depth > max {
129            match self.stats.max_depth.compare_exchange_weak(
130                max,
131                depth,
132                Ordering::Relaxed,
133                Ordering::Relaxed,
134            ) {
135                Ok(_) => break,
136                Err(current) => max = current,
137            }
138        }
139    }
140}
141
142impl MessageQueue for SpscQueue {
143    #[inline]
144    fn capacity(&self) -> usize {
145        self.capacity
146    }
147
148    #[inline]
149    fn len(&self) -> usize {
150        self.depth() as usize
151    }
152
153    #[inline]
154    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
155        let head = self.head.load(Ordering::Acquire);
156        let tail = self.tail.load(Ordering::Acquire);
157
158        // Check if full
159        if head.wrapping_sub(tail) >= self.capacity as u64 {
160            self.stats.dropped.fetch_add(1, Ordering::Relaxed);
161            return Err(RingKernelError::QueueFull {
162                capacity: self.capacity,
163            });
164        }
165
166        // Get slot
167        let index = (head as usize) & self.mask;
168        let mut slot = self.buffer[index].lock();
169        *slot = Some(envelope);
170        drop(slot);
171
172        // Advance head
173        self.head.store(head.wrapping_add(1), Ordering::Release);
174
175        // Update stats
176        self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
177        self.update_max_depth();
178
179        Ok(())
180    }
181
182    #[inline]
183    fn try_dequeue(&self) -> Result<MessageEnvelope> {
184        let tail = self.tail.load(Ordering::Acquire);
185        let head = self.head.load(Ordering::Acquire);
186
187        // Check if empty
188        if head == tail {
189            return Err(RingKernelError::QueueEmpty);
190        }
191
192        // Get slot
193        let index = (tail as usize) & self.mask;
194        let mut slot = self.buffer[index].lock();
195        let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
196        drop(slot);
197
198        // Advance tail
199        self.tail.store(tail.wrapping_add(1), Ordering::Release);
200
201        // Update stats
202        self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
203
204        Ok(envelope)
205    }
206
207    fn stats(&self) -> QueueStats {
208        QueueStats {
209            enqueued: self.stats.enqueued.load(Ordering::Relaxed),
210            dequeued: self.stats.dequeued.load(Ordering::Relaxed),
211            dropped: self.stats.dropped.load(Ordering::Relaxed),
212            depth: self.depth(),
213            max_depth: self.stats.max_depth.load(Ordering::Relaxed),
214        }
215    }
216
217    fn reset_stats(&self) {
218        self.stats.enqueued.store(0, Ordering::Relaxed);
219        self.stats.dequeued.store(0, Ordering::Relaxed);
220        self.stats.dropped.store(0, Ordering::Relaxed);
221        self.stats.max_depth.store(0, Ordering::Relaxed);
222    }
223}
224
225/// Multi-producer single-consumer lock-free queue.
226///
227/// This variant allows multiple producers (e.g., multiple host threads
228/// or kernel-to-kernel messaging) to enqueue messages concurrently.
229pub struct MpscQueue {
230    /// Inner SPSC queue.
231    inner: SpscQueue,
232    /// Lock for producers.
233    producer_lock: parking_lot::Mutex<()>,
234}
235
236impl MpscQueue {
237    /// Create a new MPSC queue.
238    pub fn new(capacity: usize) -> Self {
239        Self {
240            inner: SpscQueue::new(capacity),
241            producer_lock: parking_lot::Mutex::new(()),
242        }
243    }
244}
245
246impl MessageQueue for MpscQueue {
247    #[inline]
248    fn capacity(&self) -> usize {
249        self.inner.capacity()
250    }
251
252    #[inline]
253    fn len(&self) -> usize {
254        self.inner.len()
255    }
256
257    #[inline]
258    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
259        let _guard = self.producer_lock.lock();
260        self.inner.try_enqueue(envelope)
261    }
262
263    #[inline]
264    fn try_dequeue(&self) -> Result<MessageEnvelope> {
265        self.inner.try_dequeue()
266    }
267
268    fn stats(&self) -> QueueStats {
269        self.inner.stats()
270    }
271
272    fn reset_stats(&self) {
273        self.inner.reset_stats()
274    }
275}
276
277/// Bounded queue with blocking operations.
278pub struct BoundedQueue {
279    /// Inner MPSC queue.
280    inner: MpscQueue,
281    /// Condvar for waiting on space.
282    not_full: parking_lot::Condvar,
283    /// Condvar for waiting on data.
284    not_empty: parking_lot::Condvar,
285    /// Mutex for condvar coordination.
286    mutex: parking_lot::Mutex<()>,
287}
288
289impl BoundedQueue {
290    /// Create a new bounded queue.
291    pub fn new(capacity: usize) -> Self {
292        Self {
293            inner: MpscQueue::new(capacity),
294            not_full: parking_lot::Condvar::new(),
295            not_empty: parking_lot::Condvar::new(),
296            mutex: parking_lot::Mutex::new(()),
297        }
298    }
299
300    /// Blocking enqueue with timeout.
301    ///
302    /// Note: `envelope` is cloned on each retry because `try_enqueue` takes
303    /// ownership. For zero-copy retry, the `MessageQueue` trait would need to
304    /// return the envelope on failure, which is a larger refactor.
305    pub fn enqueue_timeout(
306        &self,
307        envelope: MessageEnvelope,
308        timeout: std::time::Duration,
309    ) -> Result<()> {
310        let deadline = std::time::Instant::now() + timeout;
311
312        loop {
313            match self.inner.try_enqueue(envelope.clone()) {
314                Ok(()) => {
315                    self.not_empty.notify_one();
316                    return Ok(());
317                }
318                Err(RingKernelError::QueueFull { .. }) => {
319                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
320                    if remaining.is_zero() {
321                        return Err(RingKernelError::Timeout(timeout));
322                    }
323                    let mut guard = self.mutex.lock();
324                    let _ = self.not_full.wait_for(&mut guard, remaining);
325                }
326                Err(e) => return Err(e),
327            }
328        }
329    }
330
331    /// Blocking dequeue with timeout.
332    pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
333        let deadline = std::time::Instant::now() + timeout;
334
335        loop {
336            match self.inner.try_dequeue() {
337                Ok(envelope) => {
338                    self.not_full.notify_one();
339                    return Ok(envelope);
340                }
341                Err(RingKernelError::QueueEmpty) => {
342                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
343                    if remaining.is_zero() {
344                        return Err(RingKernelError::Timeout(timeout));
345                    }
346                    let mut guard = self.mutex.lock();
347                    let _ = self.not_empty.wait_for(&mut guard, remaining);
348                }
349                Err(e) => return Err(e),
350            }
351        }
352    }
353}
354
355impl MessageQueue for BoundedQueue {
356    fn capacity(&self) -> usize {
357        self.inner.capacity()
358    }
359
360    fn len(&self) -> usize {
361        self.inner.len()
362    }
363
364    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
365        let result = self.inner.try_enqueue(envelope);
366        if result.is_ok() {
367            self.not_empty.notify_one();
368        }
369        result
370    }
371
372    fn try_dequeue(&self) -> Result<MessageEnvelope> {
373        let result = self.inner.try_dequeue();
374        if result.is_ok() {
375            self.not_full.notify_one();
376        }
377        result
378    }
379
380    fn stats(&self) -> QueueStats {
381        self.inner.stats()
382    }
383
384    fn reset_stats(&self) {
385        self.inner.reset_stats()
386    }
387}
388
389// ============================================================================
390// Queue Tiering Support
391// ============================================================================
392
393/// Queue capacity tiers for dynamic queue allocation.
394///
395/// Instead of dynamic resizing (which is complex for GPU memory),
396/// we provide predefined tiers that can be selected based on expected load.
397#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
398pub enum QueueTier {
399    /// Small queues (256 messages) - low traffic, minimal memory.
400    Small,
401    /// Medium queues (1024 messages) - moderate traffic.
402    #[default]
403    Medium,
404    /// Large queues (4096 messages) - high traffic.
405    Large,
406    /// Extra large queues (16384 messages) - very high traffic.
407    ExtraLarge,
408}
409
410impl QueueTier {
411    /// Get the capacity for this tier.
412    pub fn capacity(&self) -> usize {
413        match self {
414            Self::Small => 256,
415            Self::Medium => 1024,
416            Self::Large => 4096,
417            Self::ExtraLarge => 16384,
418        }
419    }
420
421    /// Suggest a tier based on expected message rate.
422    ///
423    /// # Arguments
424    ///
425    /// * `messages_per_second` - Expected message throughput
426    /// * `target_headroom_ms` - Desired buffer time in milliseconds
427    ///
428    /// # Returns
429    ///
430    /// Recommended tier based on traffic patterns.
431    pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
432        let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
433
434        if needed_capacity <= 256 {
435            Self::Small
436        } else if needed_capacity <= 1024 {
437            Self::Medium
438        } else if needed_capacity <= 4096 {
439            Self::Large
440        } else {
441            Self::ExtraLarge
442        }
443    }
444
445    /// Get the next tier up (for capacity planning).
446    pub fn upgrade(&self) -> Self {
447        match self {
448            Self::Small => Self::Medium,
449            Self::Medium => Self::Large,
450            Self::Large => Self::ExtraLarge,
451            Self::ExtraLarge => Self::ExtraLarge, // Already at max
452        }
453    }
454
455    /// Get the tier below (for memory optimization).
456    pub fn downgrade(&self) -> Self {
457        match self {
458            Self::Small => Self::Small, // Already at min
459            Self::Medium => Self::Small,
460            Self::Large => Self::Medium,
461            Self::ExtraLarge => Self::Large,
462        }
463    }
464}
465
466/// Factory for creating appropriately-sized message queues.
467///
468/// # Example
469///
470/// ```ignore
471/// use ringkernel_core::queue::{QueueFactory, QueueTier};
472///
473/// // Create a medium-sized MPSC queue
474/// let queue = QueueFactory::create_mpsc(QueueTier::Medium);
475///
476/// // Create based on expected throughput
477/// let queue = QueueFactory::create_for_throughput(10000, 100); // 10k msg/s, 100ms buffer
478/// ```
479pub struct QueueFactory;
480
481impl QueueFactory {
482    /// Create an SPSC queue with the specified tier.
483    pub fn create_spsc(tier: QueueTier) -> SpscQueue {
484        SpscQueue::new(tier.capacity())
485    }
486
487    /// Create an MPSC queue with the specified tier.
488    pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
489        MpscQueue::new(tier.capacity())
490    }
491
492    /// Create a bounded queue with the specified tier.
493    pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
494        BoundedQueue::new(tier.capacity())
495    }
496
497    /// Create a queue based on expected throughput.
498    ///
499    /// # Arguments
500    ///
501    /// * `messages_per_second` - Expected message throughput
502    /// * `headroom_ms` - Desired buffer time in milliseconds
503    pub fn create_for_throughput(
504        messages_per_second: u64,
505        headroom_ms: u64,
506    ) -> Box<dyn MessageQueue> {
507        let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
508        Box::new(Self::create_mpsc(tier))
509    }
510}
511
512/// Queue health status from monitoring.
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum QueueHealth {
515    /// Queue utilization is healthy (< warning threshold).
516    Healthy,
517    /// Queue is approaching capacity (>= warning, < critical threshold).
518    Warning,
519    /// Queue is near capacity (>= critical threshold).
520    Critical,
521}
522
523/// Monitor for queue health and utilization.
524///
525/// Provides real-time health checking for message queues without
526/// dynamic resizing, allowing applications to take action when
527/// queues approach capacity.
528///
529/// # Example
530///
531/// ```ignore
532/// use ringkernel_core::queue::{QueueMonitor, SpscQueue, QueueHealth};
533///
534/// let queue = SpscQueue::new(1024);
535/// let monitor = QueueMonitor::default();
536///
537/// // Check health periodically
538/// match monitor.check(&queue) {
539///     QueueHealth::Healthy => { /* normal operation */ }
540///     QueueHealth::Warning => { /* consider throttling producers */ }
541///     QueueHealth::Critical => { /* alert! take immediate action */ }
542/// }
543/// ```
544#[derive(Debug, Clone)]
545pub struct QueueMonitor {
546    /// Utilization threshold for warning (0.0 - 1.0).
547    pub warning_threshold: f64,
548    /// Utilization threshold for critical (0.0 - 1.0).
549    pub critical_threshold: f64,
550}
551
552impl Default for QueueMonitor {
553    fn default() -> Self {
554        Self {
555            warning_threshold: 0.75,  // 75%
556            critical_threshold: 0.90, // 90%
557        }
558    }
559}
560
561impl QueueMonitor {
562    /// Create a new queue monitor with custom thresholds.
563    pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
564        Self {
565            warning_threshold,
566            critical_threshold,
567        }
568    }
569
570    /// Check the health of a queue.
571    pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
572        let utilization = self.utilization(queue);
573
574        if utilization >= self.critical_threshold {
575            QueueHealth::Critical
576        } else if utilization >= self.warning_threshold {
577            QueueHealth::Warning
578        } else {
579            QueueHealth::Healthy
580        }
581    }
582
583    /// Get the current utilization (0.0 - 1.0).
584    pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
585        let capacity = queue.capacity();
586        if capacity == 0 {
587            return 0.0;
588        }
589        queue.len() as f64 / capacity as f64
590    }
591
592    /// Get current utilization percentage.
593    pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
594        self.utilization(queue) * 100.0
595    }
596
597    /// Suggest whether to upgrade the queue tier based on observed utilization.
598    ///
599    /// Returns `Some(QueueTier)` if upgrade is recommended, `None` otherwise.
600    pub fn suggest_upgrade(
601        &self,
602        queue: &dyn MessageQueue,
603        current_tier: QueueTier,
604    ) -> Option<QueueTier> {
605        let stats = queue.stats();
606        let utilization = self.utilization(queue);
607
608        // Upgrade if:
609        // - Current utilization is at warning level
610        // - Max observed depth is above critical threshold
611        let max_util = if queue.capacity() > 0 {
612            stats.max_depth as f64 / queue.capacity() as f64
613        } else {
614            0.0
615        };
616
617        if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
618            let upgraded = current_tier.upgrade();
619            if upgraded != current_tier {
620                return Some(upgraded);
621            }
622        }
623
624        None
625    }
626
627    /// Check if queue has experienced drops.
628    pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
629        queue.stats().dropped > 0
630    }
631
632    /// Get the drop rate (drops / total attempted enqueues).
633    pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
634        let stats = queue.stats();
635        let total_attempted = stats.enqueued + stats.dropped;
636        if total_attempted == 0 {
637            return 0.0;
638        }
639        stats.dropped as f64 / total_attempted as f64
640    }
641}
642
643/// Comprehensive queue metrics snapshot.
644#[derive(Debug, Clone)]
645pub struct QueueMetrics {
646    /// Queue health status.
647    pub health: QueueHealth,
648    /// Current utilization (0.0 - 1.0).
649    pub utilization: f64,
650    /// Queue statistics.
651    pub stats: QueueStats,
652    /// Current tier (if known).
653    pub tier: Option<QueueTier>,
654    /// Suggested tier upgrade (if recommended).
655    pub suggested_upgrade: Option<QueueTier>,
656}
657
658impl QueueMetrics {
659    /// Capture metrics from a queue.
660    pub fn capture(
661        queue: &dyn MessageQueue,
662        monitor: &QueueMonitor,
663        current_tier: Option<QueueTier>,
664    ) -> Self {
665        let health = monitor.check(queue);
666        let utilization = monitor.utilization(queue);
667        let stats = queue.stats();
668        let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
669
670        Self {
671            health,
672            utilization,
673            stats,
674            tier: current_tier,
675            suggested_upgrade,
676        }
677    }
678}
679
680// ============================================================================
681// Partitioned Queue
682// ============================================================================
683
684/// A partitioned queue for reduced contention with multiple producers.
685///
686/// Instead of a single queue with a lock, this uses multiple independent
687/// partitions (SPSC queues) to reduce contention when many producers
688/// are sending messages concurrently.
689///
690/// Producers are routed to partitions based on their source ID, ensuring
691/// messages from the same source go to the same partition (preserving order).
692///
693/// # Example
694///
695/// ```ignore
696/// use ringkernel_core::queue::{PartitionedQueue, QueueTier};
697///
698/// // Create 4 partitions with Medium tier capacity each
699/// let queue = PartitionedQueue::new(4, QueueTier::Medium.capacity());
700///
701/// // Enqueue with source-based routing
702/// queue.try_enqueue_from(source_id, envelope)?;
703///
704/// // Dequeue from any partition that has messages
705/// if let Some(envelope) = queue.try_dequeue_any() {
706///     // process message
707/// }
708/// ```
709pub struct PartitionedQueue {
710    /// Individual partition queues.
711    partitions: Vec<SpscQueue>,
712    /// Number of partitions.
713    partition_count: usize,
714    /// Round-robin dequeue index.
715    dequeue_index: AtomicU64,
716}
717
718impl PartitionedQueue {
719    /// Creates a new partitioned queue.
720    ///
721    /// # Arguments
722    ///
723    /// * `partition_count` - Number of partitions (should be power of 2 for efficiency)
724    /// * `capacity_per_partition` - Capacity of each partition
725    pub fn new(partition_count: usize, capacity_per_partition: usize) -> Self {
726        let partition_count = partition_count.max(1).next_power_of_two();
727        let partitions = (0..partition_count)
728            .map(|_| SpscQueue::new(capacity_per_partition))
729            .collect();
730
731        Self {
732            partitions,
733            partition_count,
734            dequeue_index: AtomicU64::new(0),
735        }
736    }
737
738    /// Creates a partitioned queue with default settings.
739    ///
740    /// Uses 4 partitions with Medium tier capacity.
741    pub fn with_defaults() -> Self {
742        Self::new(4, QueueTier::Medium.capacity())
743    }
744
745    /// Creates a partitioned queue sized for high contention.
746    ///
747    /// Uses 8 partitions with Large tier capacity.
748    pub fn for_high_contention() -> Self {
749        Self::new(8, QueueTier::Large.capacity())
750    }
751
752    /// Returns the partition index for a given source ID.
753    #[inline]
754    pub fn partition_for(&self, source_id: u64) -> usize {
755        (source_id as usize) & (self.partition_count - 1)
756    }
757
758    /// Returns the number of partitions.
759    pub fn partition_count(&self) -> usize {
760        self.partition_count
761    }
762
763    /// Returns the capacity per partition.
764    pub fn capacity_per_partition(&self) -> usize {
765        self.partitions.first().map_or(0, |p| p.capacity())
766    }
767
768    /// Total capacity across all partitions.
769    pub fn total_capacity(&self) -> usize {
770        self.capacity_per_partition() * self.partition_count
771    }
772
773    /// Total messages across all partitions.
774    pub fn total_messages(&self) -> usize {
775        self.partitions.iter().map(|p| p.len()).sum()
776    }
777
778    /// Enqueues a message to a partition based on source ID.
779    ///
780    /// Messages from the same source always go to the same partition,
781    /// preserving ordering for that source.
782    pub fn try_enqueue_from(&self, source_id: u64, envelope: MessageEnvelope) -> Result<()> {
783        let partition = self.partition_for(source_id);
784        self.partitions[partition].try_enqueue(envelope)
785    }
786
787    /// Enqueues a message using the envelope's source kernel ID.
788    pub fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
789        let source_id = envelope.header.source_kernel;
790        self.try_enqueue_from(source_id, envelope)
791    }
792
793    /// Tries to dequeue from a specific partition.
794    pub fn try_dequeue_partition(&self, partition: usize) -> Result<MessageEnvelope> {
795        if partition >= self.partition_count {
796            return Err(RingKernelError::InvalidConfig(format!(
797                "Invalid partition index: {} (max: {})",
798                partition,
799                self.partition_count - 1
800            )));
801        }
802        self.partitions[partition].try_dequeue()
803    }
804
805    /// Tries to dequeue from any partition that has messages.
806    ///
807    /// Uses round-robin to fairly distribute dequeues across partitions.
808    pub fn try_dequeue_any(&self) -> Option<MessageEnvelope> {
809        let start_index = self.dequeue_index.fetch_add(1, Ordering::Relaxed) as usize;
810
811        for i in 0..self.partition_count {
812            let partition = (start_index + i) & (self.partition_count - 1);
813            if let Ok(envelope) = self.partitions[partition].try_dequeue() {
814                return Some(envelope);
815            }
816        }
817
818        None
819    }
820
821    /// Returns statistics for a specific partition.
822    pub fn partition_stats(&self, partition: usize) -> Option<QueueStats> {
823        self.partitions.get(partition).map(|p| p.stats())
824    }
825
826    /// Returns aggregated statistics across all partitions.
827    pub fn stats(&self) -> PartitionedQueueStats {
828        let mut total = QueueStats::default();
829        let mut partition_stats = Vec::with_capacity(self.partition_count);
830
831        for partition in &self.partitions {
832            let stats = partition.stats();
833            total.enqueued += stats.enqueued;
834            total.dequeued += stats.dequeued;
835            total.dropped += stats.dropped;
836            total.depth += stats.depth;
837            if stats.max_depth > total.max_depth {
838                total.max_depth = stats.max_depth;
839            }
840            partition_stats.push(stats);
841        }
842
843        PartitionedQueueStats {
844            total,
845            partition_stats,
846            partition_count: self.partition_count,
847        }
848    }
849
850    /// Resets statistics for all partitions.
851    pub fn reset_stats(&self) {
852        for partition in &self.partitions {
853            partition.reset_stats();
854        }
855    }
856}
857
858/// Statistics for a partitioned queue.
859#[derive(Debug, Clone)]
860pub struct PartitionedQueueStats {
861    /// Aggregated statistics.
862    pub total: QueueStats,
863    /// Per-partition statistics.
864    pub partition_stats: Vec<QueueStats>,
865    /// Number of partitions.
866    pub partition_count: usize,
867}
868
869impl PartitionedQueueStats {
870    /// Returns the load imbalance factor (max/avg).
871    ///
872    /// A value of 1.0 indicates perfect balance.
873    /// Higher values indicate imbalance (some partitions have more messages).
874    pub fn load_imbalance(&self) -> f64 {
875        if self.partition_count == 0 {
876            return 1.0;
877        }
878
879        let avg = self.total.depth as f64 / self.partition_count as f64;
880        if avg == 0.0 {
881            return 1.0;
882        }
883
884        let max = self
885            .partition_stats
886            .iter()
887            .map(|s| s.depth)
888            .max()
889            .unwrap_or(0);
890        max as f64 / avg
891    }
892
893    /// Returns the utilization of the most loaded partition.
894    pub fn max_partition_utilization(&self, capacity_per_partition: usize) -> f64 {
895        if capacity_per_partition == 0 {
896            return 0.0;
897        }
898
899        let max = self
900            .partition_stats
901            .iter()
902            .map(|s| s.depth)
903            .max()
904            .unwrap_or(0);
905        max as f64 / capacity_per_partition as f64
906    }
907}
908
909#[cfg(test)]
910mod tests {
911    use super::*;
912    use crate::hlc::HlcTimestamp;
913    use crate::message::MessageHeader;
914
915    fn make_envelope() -> MessageEnvelope {
916        MessageEnvelope {
917            header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
918            payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
919        }
920    }
921
922    #[test]
923    fn test_spsc_basic() {
924        let queue = SpscQueue::new(16);
925
926        assert!(queue.is_empty());
927        assert!(!queue.is_full());
928
929        let env = make_envelope();
930        queue.try_enqueue(env).unwrap();
931
932        assert_eq!(queue.len(), 1);
933        assert!(!queue.is_empty());
934
935        let _ = queue.try_dequeue().unwrap();
936        assert!(queue.is_empty());
937    }
938
939    #[test]
940    fn test_spsc_full() {
941        let queue = SpscQueue::new(4);
942
943        for _ in 0..4 {
944            queue.try_enqueue(make_envelope()).unwrap();
945        }
946
947        assert!(queue.is_full());
948        assert!(matches!(
949            queue.try_enqueue(make_envelope()),
950            Err(RingKernelError::QueueFull { .. })
951        ));
952    }
953
954    #[test]
955    fn test_spsc_stats() {
956        let queue = SpscQueue::new(16);
957
958        for _ in 0..10 {
959            queue.try_enqueue(make_envelope()).unwrap();
960        }
961
962        for _ in 0..5 {
963            let _ = queue.try_dequeue().unwrap();
964        }
965
966        let stats = queue.stats();
967        assert_eq!(stats.enqueued, 10);
968        assert_eq!(stats.dequeued, 5);
969        assert_eq!(stats.depth, 5);
970    }
971
972    #[test]
973    fn test_mpsc_concurrent() {
974        use std::sync::Arc;
975        use std::thread;
976
977        let queue = Arc::new(MpscQueue::new(1024));
978        let mut handles = vec![];
979
980        // Spawn multiple producers
981        for _ in 0..4 {
982            let q = Arc::clone(&queue);
983            handles.push(thread::spawn(move || {
984                for _ in 0..100 {
985                    q.try_enqueue(make_envelope()).unwrap();
986                }
987            }));
988        }
989
990        // Wait for producers
991        for h in handles {
992            h.join().unwrap();
993        }
994
995        let stats = queue.stats();
996        assert_eq!(stats.enqueued, 400);
997    }
998
999    #[test]
1000    fn test_bounded_timeout() {
1001        let queue = BoundedQueue::new(2);
1002
1003        // Fill queue
1004        queue.try_enqueue(make_envelope()).unwrap();
1005        queue.try_enqueue(make_envelope()).unwrap();
1006
1007        // Should timeout
1008        let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
1009        assert!(matches!(result, Err(RingKernelError::Timeout(_))));
1010    }
1011
1012    // ========================================================================
1013    // Queue Tiering Tests
1014    // ========================================================================
1015
1016    #[test]
1017    fn test_queue_tier_capacities() {
1018        assert_eq!(QueueTier::Small.capacity(), 256);
1019        assert_eq!(QueueTier::Medium.capacity(), 1024);
1020        assert_eq!(QueueTier::Large.capacity(), 4096);
1021        assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
1022    }
1023
1024    #[test]
1025    fn test_queue_tier_for_throughput() {
1026        // Low traffic - 1000 msg/s with 100ms buffer = 100 msgs needed
1027        assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
1028
1029        // Medium traffic - 5000 msg/s with 100ms buffer = 500 msgs needed
1030        assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
1031
1032        // High traffic - 20000 msg/s with 100ms buffer = 2000 msgs needed
1033        assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
1034
1035        // Very high traffic - 100000 msg/s with 100ms buffer = 10000 msgs needed
1036        assert_eq!(
1037            QueueTier::for_throughput(100000, 100),
1038            QueueTier::ExtraLarge
1039        );
1040    }
1041
1042    #[test]
1043    fn test_queue_tier_upgrade_downgrade() {
1044        assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
1045        assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
1046        assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
1047        assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); // Max
1048
1049        assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); // Min
1050        assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
1051        assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
1052        assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
1053    }
1054
1055    #[test]
1056    fn test_queue_factory_creates_correct_capacity() {
1057        let spsc = QueueFactory::create_spsc(QueueTier::Medium);
1058        assert_eq!(spsc.capacity(), 1024);
1059
1060        let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
1061        assert_eq!(mpsc.capacity(), 4096);
1062
1063        let bounded = QueueFactory::create_bounded(QueueTier::Small);
1064        assert_eq!(bounded.capacity(), 256);
1065    }
1066
1067    #[test]
1068    fn test_queue_factory_throughput_based() {
1069        let queue = QueueFactory::create_for_throughput(10000, 100);
1070        // 10000 msg/s * 100ms = 1000 msgs, needs Medium tier = 1024
1071        assert_eq!(queue.capacity(), 1024);
1072    }
1073
1074    #[test]
1075    fn test_queue_monitor_health_levels() {
1076        let monitor = QueueMonitor::default();
1077        let queue = SpscQueue::new(100); // Will round to 128
1078
1079        // Empty - healthy
1080        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1081
1082        // Fill to 60% - still healthy
1083        for _ in 0..76 {
1084            queue.try_enqueue(make_envelope()).unwrap();
1085        }
1086        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1087
1088        // Fill to 80% - warning
1089        for _ in 0..26 {
1090            queue.try_enqueue(make_envelope()).unwrap();
1091        }
1092        assert_eq!(monitor.check(&queue), QueueHealth::Warning);
1093
1094        // Fill to 95% - critical
1095        for _ in 0..18 {
1096            queue.try_enqueue(make_envelope()).unwrap();
1097        }
1098        assert_eq!(monitor.check(&queue), QueueHealth::Critical);
1099    }
1100
1101    #[test]
1102    fn test_queue_monitor_utilization() {
1103        let monitor = QueueMonitor::default();
1104        let queue = SpscQueue::new(100); // Will round to 128
1105
1106        assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
1107
1108        for _ in 0..64 {
1109            queue.try_enqueue(make_envelope()).unwrap();
1110        }
1111        assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
1112    }
1113
1114    #[test]
1115    fn test_queue_monitor_drop_detection() {
1116        let monitor = QueueMonitor::default();
1117        let queue = SpscQueue::new(4);
1118
1119        // Fill queue completely
1120        for _ in 0..4 {
1121            queue.try_enqueue(make_envelope()).unwrap();
1122        }
1123        assert!(!monitor.has_drops(&queue));
1124
1125        // Attempt another enqueue (should fail and record drop)
1126        let _ = queue.try_enqueue(make_envelope());
1127        assert!(monitor.has_drops(&queue));
1128        assert!(monitor.drop_rate(&queue) > 0.0);
1129    }
1130
1131    #[test]
1132    fn test_queue_monitor_upgrade_suggestion() {
1133        let monitor = QueueMonitor::default();
1134        let queue = SpscQueue::new(QueueTier::Small.capacity());
1135
1136        // Empty queue - no upgrade needed
1137        assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
1138
1139        // Fill to warning level (>= 75%)
1140        for _ in 0..200 {
1141            queue.try_enqueue(make_envelope()).unwrap();
1142        }
1143
1144        // Should suggest upgrade
1145        let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
1146        assert_eq!(suggestion, Some(QueueTier::Medium));
1147
1148        // Already at max tier - no upgrade possible
1149        let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
1150        for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
1151            large_queue.try_enqueue(make_envelope()).unwrap();
1152        }
1153        assert!(monitor
1154            .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
1155            .is_none());
1156    }
1157
1158    #[test]
1159    fn test_queue_metrics_capture() {
1160        let queue = SpscQueue::new(QueueTier::Medium.capacity());
1161        let monitor = QueueMonitor::default();
1162
1163        // Add some messages
1164        for _ in 0..100 {
1165            queue.try_enqueue(make_envelope()).unwrap();
1166        }
1167
1168        let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
1169
1170        assert_eq!(metrics.health, QueueHealth::Healthy);
1171        assert!(metrics.utilization < 0.15);
1172        assert_eq!(metrics.stats.enqueued, 100);
1173        assert_eq!(metrics.tier, Some(QueueTier::Medium));
1174        assert!(metrics.suggested_upgrade.is_none());
1175    }
1176
1177    // ========================================================================
1178    // Partitioned Queue Tests
1179    // ========================================================================
1180
1181    #[test]
1182    fn test_partitioned_queue_creation() {
1183        let queue = PartitionedQueue::new(4, 256);
1184        assert_eq!(queue.partition_count(), 4);
1185        assert_eq!(queue.capacity_per_partition(), 256);
1186        assert_eq!(queue.total_capacity(), 1024);
1187    }
1188
1189    #[test]
1190    fn test_partitioned_queue_rounds_to_power_of_two() {
1191        let queue = PartitionedQueue::new(3, 256);
1192        assert_eq!(queue.partition_count(), 4); // Rounded up to 4
1193    }
1194
1195    #[test]
1196    fn test_partitioned_queue_routing() {
1197        let queue = PartitionedQueue::with_defaults();
1198
1199        // Same source ID should always go to same partition
1200        let partition1 = queue.partition_for(12345);
1201        let partition2 = queue.partition_for(12345);
1202        assert_eq!(partition1, partition2);
1203
1204        // Different source IDs may go to different partitions
1205        let partition_a = queue.partition_for(0);
1206        let partition_b = queue.partition_for(1);
1207        assert!(partition_a != partition_b || queue.partition_count() == 1);
1208    }
1209
1210    #[test]
1211    fn test_partitioned_queue_enqueue_dequeue() {
1212        let queue = PartitionedQueue::new(4, 64);
1213
1214        // Enqueue from different sources
1215        for source in 0..16u64 {
1216            let mut env = make_envelope();
1217            env.header.source_kernel = source;
1218            queue.try_enqueue(env).unwrap();
1219        }
1220
1221        assert_eq!(queue.total_messages(), 16);
1222
1223        // Dequeue all
1224        for _ in 0..16 {
1225            let env = queue.try_dequeue_any();
1226            assert!(env.is_some());
1227        }
1228
1229        assert_eq!(queue.total_messages(), 0);
1230        assert!(queue.try_dequeue_any().is_none());
1231    }
1232
1233    #[test]
1234    fn test_partitioned_queue_stats() {
1235        let queue = PartitionedQueue::new(4, 64);
1236
1237        // Enqueue to different partitions
1238        for source in 0..20u64 {
1239            let mut env = make_envelope();
1240            env.header.source_kernel = source;
1241            queue.try_enqueue(env).unwrap();
1242        }
1243
1244        let stats = queue.stats();
1245        assert_eq!(stats.total.enqueued, 20);
1246        assert_eq!(stats.partition_count, 4);
1247        assert_eq!(stats.partition_stats.len(), 4);
1248    }
1249
1250    #[test]
1251    fn test_partitioned_queue_load_imbalance() {
1252        let queue = PartitionedQueue::new(4, 64);
1253
1254        // All messages go to same partition (source 0 maps to partition 0)
1255        for _ in 0..10 {
1256            let mut env = make_envelope();
1257            env.header.source_kernel = 0;
1258            queue.try_enqueue(env).unwrap();
1259        }
1260
1261        let stats = queue.stats();
1262        // All 10 messages in one partition, avg = 2.5, max = 10
1263        // Imbalance = 10 / 2.5 = 4.0
1264        assert!((stats.load_imbalance() - 4.0).abs() < 0.001);
1265    }
1266
1267    #[test]
1268    fn test_partitioned_queue_dequeue_partition() {
1269        let queue = PartitionedQueue::new(4, 64);
1270
1271        // Enqueue to a specific partition (source 0)
1272        let mut env = make_envelope();
1273        env.header.source_kernel = 0;
1274        queue.try_enqueue(env).unwrap();
1275
1276        let partition = queue.partition_for(0);
1277
1278        // Dequeue from that specific partition
1279        let result = queue.try_dequeue_partition(partition);
1280        assert!(result.is_ok());
1281
1282        // Invalid partition should error
1283        let result = queue.try_dequeue_partition(100);
1284        assert!(result.is_err());
1285    }
1286}
1287
1288#[cfg(test)]
1289mod proptests {
1290    use super::*;
1291    use crate::hlc::HlcTimestamp;
1292    use crate::message::MessageHeader;
1293    use proptest::prelude::*;
1294
1295    /// Create a test envelope with a distinct message_type as sequence tag.
1296    fn make_envelope_with_seq(seq: u64) -> MessageEnvelope {
1297        MessageEnvelope {
1298            // MessageHeader::new(message_type, source_kernel, dest_kernel, payload_size, timestamp)
1299            header: MessageHeader::new(seq, 1, 0, 8, HlcTimestamp::now(1)),
1300            payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
1301        }
1302    }
1303
1304    proptest! {
1305        #[test]
1306        fn spsc_capacity_invariant(cap in 1usize..=256) {
1307            let queue = SpscQueue::new(cap);
1308            let actual_cap = queue.capacity();
1309            prop_assert!(actual_cap.is_power_of_two());
1310            prop_assert!(actual_cap >= cap);
1311        }
1312
1313        #[test]
1314        fn spsc_len_never_exceeds_capacity(n in 1usize..=128) {
1315            let queue = SpscQueue::new(n);
1316            let cap = queue.capacity();
1317            for i in 0..(cap + 10) {
1318                let _ = queue.try_enqueue(make_envelope_with_seq(i as u64));
1319                prop_assert!(queue.len() <= cap);
1320            }
1321        }
1322
1323        #[test]
1324        fn spsc_fifo_ordering(count in 1usize..=64) {
1325            let queue = SpscQueue::new((count + 1).next_power_of_two());
1326
1327            for i in 0..count {
1328                queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1329            }
1330
1331            // Dequeue and verify FIFO order via message_type tag
1332            for i in 0..count {
1333                let env = queue.try_dequeue().unwrap();
1334                prop_assert_eq!(env.header.message_type, i as u64);
1335            }
1336
1337            prop_assert!(queue.is_empty());
1338        }
1339
1340        #[test]
1341        fn spsc_stats_consistency(enqueue_count in 1usize..=64) {
1342            let queue = SpscQueue::new(64);
1343            let cap = queue.capacity();
1344            let mut expected_dropped = 0u64;
1345
1346            for i in 0..enqueue_count {
1347                if queue.try_enqueue(make_envelope_with_seq(i as u64)).is_err() {
1348                    expected_dropped += 1;
1349                }
1350            }
1351
1352            let stats = queue.stats();
1353            let successful = enqueue_count as u64 - expected_dropped;
1354            prop_assert_eq!(stats.enqueued, successful);
1355            prop_assert_eq!(stats.dropped, expected_dropped);
1356            prop_assert_eq!(stats.depth, successful);
1357            prop_assert!(stats.depth <= cap as u64);
1358        }
1359
1360        #[test]
1361        fn spsc_enqueue_dequeue_roundtrip(n in 1usize..=32) {
1362            let queue = SpscQueue::new(64);
1363
1364            for i in 0..n {
1365                queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1366            }
1367
1368            for _ in 0..n {
1369                queue.try_dequeue().unwrap();
1370            }
1371
1372            let stats = queue.stats();
1373            prop_assert_eq!(stats.enqueued, n as u64);
1374            prop_assert_eq!(stats.dequeued, n as u64);
1375            prop_assert_eq!(stats.depth, 0);
1376            prop_assert!(queue.is_empty());
1377        }
1378
1379        #[test]
1380        fn partitioned_routing_deterministic(source_id in 0u64..1000, partitions in 1usize..=8) {
1381            let queue = PartitionedQueue::new(partitions, 64);
1382            let p1 = queue.partition_for(source_id);
1383            let p2 = queue.partition_for(source_id);
1384            prop_assert_eq!(p1, p2);
1385            prop_assert!(p1 < queue.partition_count());
1386        }
1387    }
1388}