Skip to main content

ringkernel_core/
queue.rs

1//! Lock-free message queue implementation.
2//!
3//! This module provides the core message queue abstraction used for
4//! communication between host and GPU kernels. The queue uses a ring
5//! buffer design with atomic operations for lock-free access.
6//!
7//! ## Cache-line padding
8//!
9//! SPSC queue throughput under concurrent producer/consumer load is
10//! dominated by cache-line bouncing between cores. When `head` and
11//! `tail` live on the same cache line, every producer `store(head)`
12//! invalidates the consumer's cached view of `tail` and vice versa
13//! — turning every operation into a forced cache-coherence round-
14//! trip. [`CachePadded`] places each hot field on its own 128-byte
15//! cache line (the widest modern line, covering both x86 spatial
16//! prefetching pairs and NVIDIA Hopper-era L2), so producer and
17//! consumer do not contend at the line granularity.
18//!
19//! 128 bytes is a conservative choice: AMD Zen 4 / Intel Sapphire
20//! Rapids use 64-byte lines but prefetch in pairs (the "destructive
21//! interference pair"), which `std::sync::atomic::hint::spin_loop`
22//! and crossbeam-utils both target with 128 bytes of padding.
23
24use std::sync::atomic::{AtomicU64, Ordering};
25
26use crate::error::{Result, RingKernelError};
27use crate::message::MessageEnvelope;
28
29/// 128-byte aligned wrapper that puts a field on its own cache
30/// line, avoiding false sharing between producer and consumer of an
31/// SPSC ring.
32///
33/// Using explicit `repr(align(128))` rather than depending on
34/// `crossbeam-utils::CachePadded` so the core crate stays lean.
35#[repr(align(128))]
36#[derive(Default)]
37pub(crate) struct CachePadded<T>(pub T);
38
39impl<T> std::ops::Deref for CachePadded<T> {
40    type Target = T;
41    fn deref(&self) -> &T {
42        &self.0
43    }
44}
45
46/// Statistics for a message queue.
47#[derive(Debug, Clone, Default)]
48pub struct QueueStats {
49    /// Total messages enqueued.
50    pub enqueued: u64,
51    /// Total messages dequeued.
52    pub dequeued: u64,
53    /// Messages dropped due to full queue.
54    pub dropped: u64,
55    /// Current queue depth.
56    pub depth: u64,
57    /// Maximum queue depth observed.
58    pub max_depth: u64,
59}
60
61/// Trait for message queue implementations.
62///
63/// Message queues provide lock-free FIFO communication between
64/// producers (host or other kernels) and consumers (GPU kernels).
65pub trait MessageQueue: Send + Sync {
66    /// Get the queue capacity.
67    fn capacity(&self) -> usize;
68
69    /// Get current queue size.
70    fn len(&self) -> usize;
71
72    /// Check if queue is empty.
73    fn is_empty(&self) -> bool {
74        self.len() == 0
75    }
76
77    /// Check if queue is full.
78    fn is_full(&self) -> bool {
79        self.len() >= self.capacity()
80    }
81
82    /// Try to enqueue a message envelope.
83    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
84
85    /// Try to dequeue a message envelope.
86    fn try_dequeue(&self) -> Result<MessageEnvelope>;
87
88    /// Get queue statistics.
89    fn stats(&self) -> QueueStats;
90
91    /// Reset queue statistics.
92    fn reset_stats(&self);
93}
94
95/// Single-producer single-consumer lock-free ring buffer.
96///
97/// This implementation is truly lock-free: no mutexes, no blocking.
98/// It uses atomic head/tail pointers with acquire/release ordering.
99/// Since SPSC has exactly one producer and one consumer, no CAS is
100/// needed — ordered loads and stores are sufficient.
101///
102/// # Memory Ordering
103///
104/// - Producer: writes data, then `Release`-stores head (publishes the write)
105/// - Consumer: `Acquire`-loads head (observes the write), reads data, then `Release`-stores tail
106/// - This matches the GPU-side H2K/K2H queue protocol (volatile + fence)
107pub struct SpscQueue {
108    /// Ring buffer storage. UnsafeCell for interior mutability without Mutex.
109    /// Each slot holds an Option<MessageEnvelope>.
110    buffer: Vec<std::cell::UnsafeCell<Option<MessageEnvelope>>>,
111    /// Capacity (power of 2).
112    capacity: usize,
113    /// Mask for index wrapping (capacity - 1).
114    mask: usize,
115    /// Head pointer (producer writes here, only modified by producer).
116    /// Cache-line padded to avoid false sharing with `tail`.
117    head: CachePadded<AtomicU64>,
118    /// Tail pointer (consumer reads from here, only modified by consumer).
119    /// Cache-line padded to avoid false sharing with `head` and stats.
120    tail: CachePadded<AtomicU64>,
121    /// Producer-side counters (enqueued / dropped / max_depth).
122    /// Separate cache line from consumer stats so the two sides do
123    /// not false-share.
124    producer_stats: CachePadded<ProducerStats>,
125    /// Consumer-side counters (dequeued).
126    consumer_stats: CachePadded<ConsumerStats>,
127}
128
129// SAFETY: SpscQueue is Send+Sync because:
130// - head is only written by the producer, tail only by the consumer
131// - Buffer slots are accessed with proper acquire/release ordering:
132//   slot[head] is written by producer BEFORE head is advanced (Release)
133//   slot[tail] is read by consumer AFTER head is observed > tail (Acquire)
134// - No two threads ever access the same slot simultaneously:
135//   producer writes slot[head], consumer reads slot[tail], and head != tail
136//   is guaranteed by the full check (head - tail < capacity)
137unsafe impl Send for SpscQueue {}
138unsafe impl Sync for SpscQueue {}
139
140/// Producer-side stats (counters modified only by the producer).
141/// Split from consumer stats so the two sides don't false-share.
142#[derive(Default)]
143struct ProducerStats {
144    enqueued: AtomicU64,
145    dropped: AtomicU64,
146    max_depth: AtomicU64,
147}
148
149/// Consumer-side stats (counters modified only by the consumer).
150#[derive(Default)]
151struct ConsumerStats {
152    dequeued: AtomicU64,
153}
154
155impl From<(&ProducerStats, &ConsumerStats)> for QueueStats {
156    fn from(split: (&ProducerStats, &ConsumerStats)) -> Self {
157        let (p, c) = split;
158        QueueStats {
159            enqueued: p.enqueued.load(Ordering::Relaxed),
160            dequeued: c.dequeued.load(Ordering::Relaxed),
161            dropped: p.dropped.load(Ordering::Relaxed),
162            depth: p
163                .enqueued
164                .load(Ordering::Relaxed)
165                .wrapping_sub(c.dequeued.load(Ordering::Relaxed)),
166            max_depth: p.max_depth.load(Ordering::Relaxed),
167        }
168    }
169}
170
171impl SpscQueue {
172    /// Create a new queue with the given capacity.
173    ///
174    /// Capacity will be rounded up to the next power of 2.
175    pub fn new(capacity: usize) -> Self {
176        let capacity = capacity.next_power_of_two();
177        let mask = capacity - 1;
178
179        let mut buffer = Vec::with_capacity(capacity);
180        for _ in 0..capacity {
181            buffer.push(std::cell::UnsafeCell::new(None));
182        }
183
184        Self {
185            buffer,
186            capacity,
187            mask,
188            head: CachePadded(AtomicU64::new(0)),
189            tail: CachePadded(AtomicU64::new(0)),
190            producer_stats: CachePadded(ProducerStats::default()),
191            consumer_stats: CachePadded(ConsumerStats::default()),
192        }
193    }
194
195    /// Get current depth.
196    #[inline]
197    fn depth(&self) -> u64 {
198        let head = self.head.load(Ordering::Acquire);
199        let tail = self.tail.load(Ordering::Acquire);
200        head.wrapping_sub(tail)
201    }
202
203    /// Update max depth statistic. Single producer means no
204    /// contention on max_depth — we use `fetch_max` which is a
205    /// single atomic RMW rather than the earlier CAS loop.
206    fn update_max_depth(&self) {
207        let depth = self.depth();
208        self.producer_stats
209            .max_depth
210            .fetch_max(depth, Ordering::Relaxed);
211    }
212}
213
214impl MessageQueue for SpscQueue {
215    #[inline]
216    fn capacity(&self) -> usize {
217        self.capacity
218    }
219
220    #[inline]
221    fn len(&self) -> usize {
222        self.depth() as usize
223    }
224
225    #[inline]
226    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
227        let head = self.head.load(Ordering::Relaxed); // Only producer reads head
228        let tail = self.tail.load(Ordering::Acquire); // Need to see consumer's tail updates
229
230        // Check if full
231        if head.wrapping_sub(tail) >= self.capacity as u64 {
232            self.producer_stats.dropped.fetch_add(1, Ordering::Relaxed);
233            return Err(RingKernelError::QueueFull {
234                capacity: self.capacity,
235            });
236        }
237
238        // Write data to slot — safe because producer owns this slot:
239        // consumer won't read slot[head] until head is advanced below.
240        let index = (head as usize) & self.mask;
241        // SAFETY: No concurrent access to this slot. The consumer only reads
242        // slots where tail <= index < head (before the new head). We haven't
243        // advanced head yet, so this slot is exclusively ours.
244        unsafe {
245            *self.buffer[index].get() = Some(envelope);
246        }
247
248        // Publish: advance head with Release ordering.
249        // This ensures the data write above is visible to the consumer
250        // before the consumer sees the new head value.
251        self.head.store(head.wrapping_add(1), Ordering::Release);
252
253        // Update stats (producer side only — no coherence traffic
254        // with consumer).
255        self.producer_stats.enqueued.fetch_add(1, Ordering::Relaxed);
256        self.update_max_depth();
257
258        Ok(())
259    }
260
261    #[inline]
262    fn try_dequeue(&self) -> Result<MessageEnvelope> {
263        let tail = self.tail.load(Ordering::Relaxed); // Only consumer reads tail
264        let head = self.head.load(Ordering::Acquire); // Need to see producer's head updates
265
266        // Check if empty
267        if head == tail {
268            return Err(RingKernelError::QueueEmpty);
269        }
270
271        // Read data from slot — safe because consumer owns this slot:
272        // producer won't write slot[tail] because head > tail means
273        // the producer is writing to slot[head], not slot[tail].
274        let index = (tail as usize) & self.mask;
275        // SAFETY: No concurrent access to this slot. The producer writes
276        // slots at head (which is > tail), not at tail. The Acquire load
277        // of head above ensures we see the producer's data write.
278        let envelope = unsafe {
279            (*self.buffer[index].get())
280                .take()
281                .ok_or(RingKernelError::QueueEmpty)?
282        };
283
284        // Publish: advance tail with Release ordering.
285        // This ensures the slot is logically freed before the producer
286        // sees the new tail and potentially reuses this slot.
287        self.tail.store(tail.wrapping_add(1), Ordering::Release);
288
289        // Update stats (consumer side only).
290        self.consumer_stats.dequeued.fetch_add(1, Ordering::Relaxed);
291
292        Ok(envelope)
293    }
294
295    fn stats(&self) -> QueueStats {
296        QueueStats::from((&*self.producer_stats, &*self.consumer_stats))
297    }
298
299    fn reset_stats(&self) {
300        self.producer_stats.enqueued.store(0, Ordering::Relaxed);
301        self.producer_stats.dropped.store(0, Ordering::Relaxed);
302        self.producer_stats.max_depth.store(0, Ordering::Relaxed);
303        self.consumer_stats.dequeued.store(0, Ordering::Relaxed);
304    }
305}
306
307/// Multi-producer single-consumer lock-free queue.
308///
309/// This variant allows multiple producers (e.g., multiple host threads
310/// or kernel-to-kernel messaging) to enqueue messages concurrently.
311pub struct MpscQueue {
312    /// Inner SPSC queue.
313    inner: SpscQueue,
314    /// Lock for producers.
315    producer_lock: parking_lot::Mutex<()>,
316}
317
318impl MpscQueue {
319    /// Create a new MPSC queue.
320    pub fn new(capacity: usize) -> Self {
321        Self {
322            inner: SpscQueue::new(capacity),
323            producer_lock: parking_lot::Mutex::new(()),
324        }
325    }
326}
327
328impl MessageQueue for MpscQueue {
329    #[inline]
330    fn capacity(&self) -> usize {
331        self.inner.capacity()
332    }
333
334    #[inline]
335    fn len(&self) -> usize {
336        self.inner.len()
337    }
338
339    #[inline]
340    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
341        let _guard = self.producer_lock.lock();
342        self.inner.try_enqueue(envelope)
343    }
344
345    #[inline]
346    fn try_dequeue(&self) -> Result<MessageEnvelope> {
347        self.inner.try_dequeue()
348    }
349
350    fn stats(&self) -> QueueStats {
351        self.inner.stats()
352    }
353
354    fn reset_stats(&self) {
355        self.inner.reset_stats()
356    }
357}
358
359/// Bounded queue with blocking operations.
360pub struct BoundedQueue {
361    /// Inner MPSC queue.
362    inner: MpscQueue,
363    /// Condvar for waiting on space.
364    not_full: parking_lot::Condvar,
365    /// Condvar for waiting on data.
366    not_empty: parking_lot::Condvar,
367    /// Mutex for condvar coordination.
368    mutex: parking_lot::Mutex<()>,
369}
370
371impl BoundedQueue {
372    /// Create a new bounded queue.
373    pub fn new(capacity: usize) -> Self {
374        Self {
375            inner: MpscQueue::new(capacity),
376            not_full: parking_lot::Condvar::new(),
377            not_empty: parking_lot::Condvar::new(),
378            mutex: parking_lot::Mutex::new(()),
379        }
380    }
381
382    /// Blocking enqueue with timeout.
383    ///
384    /// Note: `envelope` is cloned on each retry because `try_enqueue` takes
385    /// ownership. For zero-copy retry, the `MessageQueue` trait would need to
386    /// return the envelope on failure, which is a larger refactor.
387    pub fn enqueue_timeout(
388        &self,
389        envelope: MessageEnvelope,
390        timeout: std::time::Duration,
391    ) -> Result<()> {
392        let deadline = std::time::Instant::now() + timeout;
393
394        loop {
395            match self.inner.try_enqueue(envelope.clone()) {
396                Ok(()) => {
397                    self.not_empty.notify_one();
398                    return Ok(());
399                }
400                Err(RingKernelError::QueueFull { .. }) => {
401                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
402                    if remaining.is_zero() {
403                        return Err(RingKernelError::Timeout(timeout));
404                    }
405                    let mut guard = self.mutex.lock();
406                    let _ = self.not_full.wait_for(&mut guard, remaining);
407                }
408                Err(e) => return Err(e),
409            }
410        }
411    }
412
413    /// Blocking dequeue with timeout.
414    pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
415        let deadline = std::time::Instant::now() + timeout;
416
417        loop {
418            match self.inner.try_dequeue() {
419                Ok(envelope) => {
420                    self.not_full.notify_one();
421                    return Ok(envelope);
422                }
423                Err(RingKernelError::QueueEmpty) => {
424                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
425                    if remaining.is_zero() {
426                        return Err(RingKernelError::Timeout(timeout));
427                    }
428                    let mut guard = self.mutex.lock();
429                    let _ = self.not_empty.wait_for(&mut guard, remaining);
430                }
431                Err(e) => return Err(e),
432            }
433        }
434    }
435}
436
437impl MessageQueue for BoundedQueue {
438    fn capacity(&self) -> usize {
439        self.inner.capacity()
440    }
441
442    fn len(&self) -> usize {
443        self.inner.len()
444    }
445
446    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
447        let result = self.inner.try_enqueue(envelope);
448        if result.is_ok() {
449            self.not_empty.notify_one();
450        }
451        result
452    }
453
454    fn try_dequeue(&self) -> Result<MessageEnvelope> {
455        let result = self.inner.try_dequeue();
456        if result.is_ok() {
457            self.not_full.notify_one();
458        }
459        result
460    }
461
462    fn stats(&self) -> QueueStats {
463        self.inner.stats()
464    }
465
466    fn reset_stats(&self) {
467        self.inner.reset_stats()
468    }
469}
470
471// ============================================================================
472// Queue Tiering Support
473// ============================================================================
474
475/// Queue capacity tiers for dynamic queue allocation.
476///
477/// Instead of dynamic resizing (which is complex for GPU memory),
478/// we provide predefined tiers that can be selected based on expected load.
479#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
480pub enum QueueTier {
481    /// Small queues (256 messages) - low traffic, minimal memory.
482    Small,
483    /// Medium queues (1024 messages) - moderate traffic.
484    #[default]
485    Medium,
486    /// Large queues (4096 messages) - high traffic.
487    Large,
488    /// Extra large queues (16384 messages) - very high traffic.
489    ExtraLarge,
490}
491
492impl QueueTier {
493    /// Get the capacity for this tier.
494    pub fn capacity(&self) -> usize {
495        match self {
496            Self::Small => 256,
497            Self::Medium => 1024,
498            Self::Large => 4096,
499            Self::ExtraLarge => 16384,
500        }
501    }
502
503    /// Suggest a tier based on expected message rate.
504    ///
505    /// # Arguments
506    ///
507    /// * `messages_per_second` - Expected message throughput
508    /// * `target_headroom_ms` - Desired buffer time in milliseconds
509    ///
510    /// # Returns
511    ///
512    /// Recommended tier based on traffic patterns.
513    pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
514        let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
515
516        if needed_capacity <= 256 {
517            Self::Small
518        } else if needed_capacity <= 1024 {
519            Self::Medium
520        } else if needed_capacity <= 4096 {
521            Self::Large
522        } else {
523            Self::ExtraLarge
524        }
525    }
526
527    /// Get the next tier up (for capacity planning).
528    pub fn upgrade(&self) -> Self {
529        match self {
530            Self::Small => Self::Medium,
531            Self::Medium => Self::Large,
532            Self::Large => Self::ExtraLarge,
533            Self::ExtraLarge => Self::ExtraLarge, // Already at max
534        }
535    }
536
537    /// Get the tier below (for memory optimization).
538    pub fn downgrade(&self) -> Self {
539        match self {
540            Self::Small => Self::Small, // Already at min
541            Self::Medium => Self::Small,
542            Self::Large => Self::Medium,
543            Self::ExtraLarge => Self::Large,
544        }
545    }
546}
547
548/// Factory for creating appropriately-sized message queues.
549///
550/// # Example
551///
552/// ```ignore
553/// use ringkernel_core::queue::{QueueFactory, QueueTier};
554///
555/// // Create a medium-sized MPSC queue
556/// let queue = QueueFactory::create_mpsc(QueueTier::Medium);
557///
558/// // Create based on expected throughput
559/// let queue = QueueFactory::create_for_throughput(10000, 100); // 10k msg/s, 100ms buffer
560/// ```
561pub struct QueueFactory;
562
563impl QueueFactory {
564    /// Create an SPSC queue with the specified tier.
565    pub fn create_spsc(tier: QueueTier) -> SpscQueue {
566        SpscQueue::new(tier.capacity())
567    }
568
569    /// Create an MPSC queue with the specified tier.
570    pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
571        MpscQueue::new(tier.capacity())
572    }
573
574    /// Create a bounded queue with the specified tier.
575    pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
576        BoundedQueue::new(tier.capacity())
577    }
578
579    /// Create a queue based on expected throughput.
580    ///
581    /// # Arguments
582    ///
583    /// * `messages_per_second` - Expected message throughput
584    /// * `headroom_ms` - Desired buffer time in milliseconds
585    pub fn create_for_throughput(
586        messages_per_second: u64,
587        headroom_ms: u64,
588    ) -> Box<dyn MessageQueue> {
589        let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
590        Box::new(Self::create_mpsc(tier))
591    }
592}
593
594/// Queue health status from monitoring.
595#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596pub enum QueueHealth {
597    /// Queue utilization is healthy (< warning threshold).
598    Healthy,
599    /// Queue is approaching capacity (>= warning, < critical threshold).
600    Warning,
601    /// Queue is near capacity (>= critical threshold).
602    Critical,
603}
604
605/// Monitor for queue health and utilization.
606///
607/// Provides real-time health checking for message queues without
608/// dynamic resizing, allowing applications to take action when
609/// queues approach capacity.
610///
611/// # Example
612///
613/// ```ignore
614/// use ringkernel_core::queue::{QueueMonitor, SpscQueue, QueueHealth};
615///
616/// let queue = SpscQueue::new(1024);
617/// let monitor = QueueMonitor::default();
618///
619/// // Check health periodically
620/// match monitor.check(&queue) {
621///     QueueHealth::Healthy => { /* normal operation */ }
622///     QueueHealth::Warning => { /* consider throttling producers */ }
623///     QueueHealth::Critical => { /* alert! take immediate action */ }
624/// }
625/// ```
626#[derive(Debug, Clone)]
627pub struct QueueMonitor {
628    /// Utilization threshold for warning (0.0 - 1.0).
629    pub warning_threshold: f64,
630    /// Utilization threshold for critical (0.0 - 1.0).
631    pub critical_threshold: f64,
632}
633
634impl Default for QueueMonitor {
635    fn default() -> Self {
636        Self {
637            warning_threshold: 0.75,  // 75%
638            critical_threshold: 0.90, // 90%
639        }
640    }
641}
642
643impl QueueMonitor {
644    /// Create a new queue monitor with custom thresholds.
645    pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
646        Self {
647            warning_threshold,
648            critical_threshold,
649        }
650    }
651
652    /// Check the health of a queue.
653    pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
654        let utilization = self.utilization(queue);
655
656        if utilization >= self.critical_threshold {
657            QueueHealth::Critical
658        } else if utilization >= self.warning_threshold {
659            QueueHealth::Warning
660        } else {
661            QueueHealth::Healthy
662        }
663    }
664
665    /// Get the current utilization (0.0 - 1.0).
666    pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
667        let capacity = queue.capacity();
668        if capacity == 0 {
669            return 0.0;
670        }
671        queue.len() as f64 / capacity as f64
672    }
673
674    /// Get current utilization percentage.
675    pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
676        self.utilization(queue) * 100.0
677    }
678
679    /// Suggest whether to upgrade the queue tier based on observed utilization.
680    ///
681    /// Returns `Some(QueueTier)` if upgrade is recommended, `None` otherwise.
682    pub fn suggest_upgrade(
683        &self,
684        queue: &dyn MessageQueue,
685        current_tier: QueueTier,
686    ) -> Option<QueueTier> {
687        let stats = queue.stats();
688        let utilization = self.utilization(queue);
689
690        // Upgrade if:
691        // - Current utilization is at warning level
692        // - Max observed depth is above critical threshold
693        let max_util = if queue.capacity() > 0 {
694            stats.max_depth as f64 / queue.capacity() as f64
695        } else {
696            0.0
697        };
698
699        if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
700            let upgraded = current_tier.upgrade();
701            if upgraded != current_tier {
702                return Some(upgraded);
703            }
704        }
705
706        None
707    }
708
709    /// Check if queue has experienced drops.
710    pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
711        queue.stats().dropped > 0
712    }
713
714    /// Get the drop rate (drops / total attempted enqueues).
715    pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
716        let stats = queue.stats();
717        let total_attempted = stats.enqueued + stats.dropped;
718        if total_attempted == 0 {
719            return 0.0;
720        }
721        stats.dropped as f64 / total_attempted as f64
722    }
723}
724
725/// Comprehensive queue metrics snapshot.
726#[derive(Debug, Clone)]
727pub struct QueueMetrics {
728    /// Queue health status.
729    pub health: QueueHealth,
730    /// Current utilization (0.0 - 1.0).
731    pub utilization: f64,
732    /// Queue statistics.
733    pub stats: QueueStats,
734    /// Current tier (if known).
735    pub tier: Option<QueueTier>,
736    /// Suggested tier upgrade (if recommended).
737    pub suggested_upgrade: Option<QueueTier>,
738}
739
740impl QueueMetrics {
741    /// Capture metrics from a queue.
742    pub fn capture(
743        queue: &dyn MessageQueue,
744        monitor: &QueueMonitor,
745        current_tier: Option<QueueTier>,
746    ) -> Self {
747        let health = monitor.check(queue);
748        let utilization = monitor.utilization(queue);
749        let stats = queue.stats();
750        let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
751
752        Self {
753            health,
754            utilization,
755            stats,
756            tier: current_tier,
757            suggested_upgrade,
758        }
759    }
760}
761
762// ============================================================================
763// Partitioned Queue
764// ============================================================================
765
766/// A partitioned queue for reduced contention with multiple producers.
767///
768/// Instead of a single queue with a lock, this uses multiple independent
769/// partitions (SPSC queues) to reduce contention when many producers
770/// are sending messages concurrently.
771///
772/// Producers are routed to partitions based on their source ID, ensuring
773/// messages from the same source go to the same partition (preserving order).
774///
775/// # Example
776///
777/// ```ignore
778/// use ringkernel_core::queue::{PartitionedQueue, QueueTier};
779///
780/// // Create 4 partitions with Medium tier capacity each
781/// let queue = PartitionedQueue::new(4, QueueTier::Medium.capacity());
782///
783/// // Enqueue with source-based routing
784/// queue.try_enqueue_from(source_id, envelope)?;
785///
786/// // Dequeue from any partition that has messages
787/// if let Some(envelope) = queue.try_dequeue_any() {
788///     // process message
789/// }
790/// ```
791pub struct PartitionedQueue {
792    /// Individual partition queues.
793    partitions: Vec<SpscQueue>,
794    /// Number of partitions.
795    partition_count: usize,
796    /// Round-robin dequeue index.
797    dequeue_index: AtomicU64,
798}
799
800impl PartitionedQueue {
801    /// Creates a new partitioned queue.
802    ///
803    /// # Arguments
804    ///
805    /// * `partition_count` - Number of partitions (should be power of 2 for efficiency)
806    /// * `capacity_per_partition` - Capacity of each partition
807    pub fn new(partition_count: usize, capacity_per_partition: usize) -> Self {
808        let partition_count = partition_count.max(1).next_power_of_two();
809        let partitions = (0..partition_count)
810            .map(|_| SpscQueue::new(capacity_per_partition))
811            .collect();
812
813        Self {
814            partitions,
815            partition_count,
816            dequeue_index: AtomicU64::new(0),
817        }
818    }
819
820    /// Creates a partitioned queue with default settings.
821    ///
822    /// Uses 4 partitions with Medium tier capacity.
823    pub fn with_defaults() -> Self {
824        Self::new(4, QueueTier::Medium.capacity())
825    }
826
827    /// Creates a partitioned queue sized for high contention.
828    ///
829    /// Uses 8 partitions with Large tier capacity.
830    pub fn for_high_contention() -> Self {
831        Self::new(8, QueueTier::Large.capacity())
832    }
833
834    /// Returns the partition index for a given source ID.
835    #[inline]
836    pub fn partition_for(&self, source_id: u64) -> usize {
837        (source_id as usize) & (self.partition_count - 1)
838    }
839
840    /// Returns the number of partitions.
841    pub fn partition_count(&self) -> usize {
842        self.partition_count
843    }
844
845    /// Returns the capacity per partition.
846    pub fn capacity_per_partition(&self) -> usize {
847        self.partitions.first().map_or(0, |p| p.capacity())
848    }
849
850    /// Total capacity across all partitions.
851    pub fn total_capacity(&self) -> usize {
852        self.capacity_per_partition() * self.partition_count
853    }
854
855    /// Total messages across all partitions.
856    pub fn total_messages(&self) -> usize {
857        self.partitions.iter().map(|p| p.len()).sum()
858    }
859
860    /// Enqueues a message to a partition based on source ID.
861    ///
862    /// Messages from the same source always go to the same partition,
863    /// preserving ordering for that source.
864    pub fn try_enqueue_from(&self, source_id: u64, envelope: MessageEnvelope) -> Result<()> {
865        let partition = self.partition_for(source_id);
866        self.partitions[partition].try_enqueue(envelope)
867    }
868
869    /// Enqueues a message using the envelope's source kernel ID.
870    pub fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
871        let source_id = envelope.header.source_kernel;
872        self.try_enqueue_from(source_id, envelope)
873    }
874
875    /// Tries to dequeue from a specific partition.
876    pub fn try_dequeue_partition(&self, partition: usize) -> Result<MessageEnvelope> {
877        if partition >= self.partition_count {
878            return Err(RingKernelError::InvalidConfig(format!(
879                "Invalid partition index: {} (max: {})",
880                partition,
881                self.partition_count - 1
882            )));
883        }
884        self.partitions[partition].try_dequeue()
885    }
886
887    /// Tries to dequeue from any partition that has messages.
888    ///
889    /// Uses round-robin to fairly distribute dequeues across partitions.
890    pub fn try_dequeue_any(&self) -> Option<MessageEnvelope> {
891        let start_index = self.dequeue_index.fetch_add(1, Ordering::Relaxed) as usize;
892
893        for i in 0..self.partition_count {
894            let partition = (start_index + i) & (self.partition_count - 1);
895            if let Ok(envelope) = self.partitions[partition].try_dequeue() {
896                return Some(envelope);
897            }
898        }
899
900        None
901    }
902
903    /// Returns statistics for a specific partition.
904    pub fn partition_stats(&self, partition: usize) -> Option<QueueStats> {
905        self.partitions.get(partition).map(|p| p.stats())
906    }
907
908    /// Returns aggregated statistics across all partitions.
909    pub fn stats(&self) -> PartitionedQueueStats {
910        let mut total = QueueStats::default();
911        let mut partition_stats = Vec::with_capacity(self.partition_count);
912
913        for partition in &self.partitions {
914            let stats = partition.stats();
915            total.enqueued += stats.enqueued;
916            total.dequeued += stats.dequeued;
917            total.dropped += stats.dropped;
918            total.depth += stats.depth;
919            if stats.max_depth > total.max_depth {
920                total.max_depth = stats.max_depth;
921            }
922            partition_stats.push(stats);
923        }
924
925        PartitionedQueueStats {
926            total,
927            partition_stats,
928            partition_count: self.partition_count,
929        }
930    }
931
932    /// Resets statistics for all partitions.
933    pub fn reset_stats(&self) {
934        for partition in &self.partitions {
935            partition.reset_stats();
936        }
937    }
938}
939
940/// Statistics for a partitioned queue.
941#[derive(Debug, Clone)]
942pub struct PartitionedQueueStats {
943    /// Aggregated statistics.
944    pub total: QueueStats,
945    /// Per-partition statistics.
946    pub partition_stats: Vec<QueueStats>,
947    /// Number of partitions.
948    pub partition_count: usize,
949}
950
951impl PartitionedQueueStats {
952    /// Returns the load imbalance factor (max/avg).
953    ///
954    /// A value of 1.0 indicates perfect balance.
955    /// Higher values indicate imbalance (some partitions have more messages).
956    pub fn load_imbalance(&self) -> f64 {
957        if self.partition_count == 0 {
958            return 1.0;
959        }
960
961        let avg = self.total.depth as f64 / self.partition_count as f64;
962        if avg == 0.0 {
963            return 1.0;
964        }
965
966        let max = self
967            .partition_stats
968            .iter()
969            .map(|s| s.depth)
970            .max()
971            .unwrap_or(0);
972        max as f64 / avg
973    }
974
975    /// Returns the utilization of the most loaded partition.
976    pub fn max_partition_utilization(&self, capacity_per_partition: usize) -> f64 {
977        if capacity_per_partition == 0 {
978            return 0.0;
979        }
980
981        let max = self
982            .partition_stats
983            .iter()
984            .map(|s| s.depth)
985            .max()
986            .unwrap_or(0);
987        max as f64 / capacity_per_partition as f64
988    }
989}
990
991#[cfg(test)]
992mod tests {
993    use super::*;
994    use crate::hlc::HlcTimestamp;
995    use crate::message::MessageHeader;
996
997    fn make_envelope() -> MessageEnvelope {
998        MessageEnvelope {
999            header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
1000            payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
1001            provenance: None,
1002            tenant_id: 0,
1003            audit_tag: crate::k2k::audit_tag::AuditTag::unspecified(),
1004        }
1005    }
1006
1007    #[test]
1008    fn test_spsc_basic() {
1009        let queue = SpscQueue::new(16);
1010
1011        assert!(queue.is_empty());
1012        assert!(!queue.is_full());
1013
1014        let env = make_envelope();
1015        queue.try_enqueue(env).unwrap();
1016
1017        assert_eq!(queue.len(), 1);
1018        assert!(!queue.is_empty());
1019
1020        let _ = queue.try_dequeue().unwrap();
1021        assert!(queue.is_empty());
1022    }
1023
1024    #[test]
1025    fn test_spsc_full() {
1026        let queue = SpscQueue::new(4);
1027
1028        for _ in 0..4 {
1029            queue.try_enqueue(make_envelope()).unwrap();
1030        }
1031
1032        assert!(queue.is_full());
1033        assert!(matches!(
1034            queue.try_enqueue(make_envelope()),
1035            Err(RingKernelError::QueueFull { .. })
1036        ));
1037    }
1038
1039    #[test]
1040    fn test_spsc_stats() {
1041        let queue = SpscQueue::new(16);
1042
1043        for _ in 0..10 {
1044            queue.try_enqueue(make_envelope()).unwrap();
1045        }
1046
1047        for _ in 0..5 {
1048            let _ = queue.try_dequeue().unwrap();
1049        }
1050
1051        let stats = queue.stats();
1052        assert_eq!(stats.enqueued, 10);
1053        assert_eq!(stats.dequeued, 5);
1054        assert_eq!(stats.depth, 5);
1055    }
1056
1057    #[test]
1058    fn test_mpsc_concurrent() {
1059        use std::sync::Arc;
1060        use std::thread;
1061
1062        let queue = Arc::new(MpscQueue::new(1024));
1063        let mut handles = vec![];
1064
1065        // Spawn multiple producers
1066        for _ in 0..4 {
1067            let q = Arc::clone(&queue);
1068            handles.push(thread::spawn(move || {
1069                for _ in 0..100 {
1070                    q.try_enqueue(make_envelope()).unwrap();
1071                }
1072            }));
1073        }
1074
1075        // Wait for producers
1076        for h in handles {
1077            h.join().unwrap();
1078        }
1079
1080        let stats = queue.stats();
1081        assert_eq!(stats.enqueued, 400);
1082    }
1083
1084    #[test]
1085    fn test_bounded_timeout() {
1086        let queue = BoundedQueue::new(2);
1087
1088        // Fill queue
1089        queue.try_enqueue(make_envelope()).unwrap();
1090        queue.try_enqueue(make_envelope()).unwrap();
1091
1092        // Should timeout
1093        let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
1094        assert!(matches!(result, Err(RingKernelError::Timeout(_))));
1095    }
1096
1097    // ========================================================================
1098    // Queue Tiering Tests
1099    // ========================================================================
1100
1101    #[test]
1102    fn test_queue_tier_capacities() {
1103        assert_eq!(QueueTier::Small.capacity(), 256);
1104        assert_eq!(QueueTier::Medium.capacity(), 1024);
1105        assert_eq!(QueueTier::Large.capacity(), 4096);
1106        assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
1107    }
1108
1109    #[test]
1110    fn test_queue_tier_for_throughput() {
1111        // Low traffic - 1000 msg/s with 100ms buffer = 100 msgs needed
1112        assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
1113
1114        // Medium traffic - 5000 msg/s with 100ms buffer = 500 msgs needed
1115        assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
1116
1117        // High traffic - 20000 msg/s with 100ms buffer = 2000 msgs needed
1118        assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
1119
1120        // Very high traffic - 100000 msg/s with 100ms buffer = 10000 msgs needed
1121        assert_eq!(
1122            QueueTier::for_throughput(100000, 100),
1123            QueueTier::ExtraLarge
1124        );
1125    }
1126
1127    #[test]
1128    fn test_queue_tier_upgrade_downgrade() {
1129        assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
1130        assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
1131        assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
1132        assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge); // Max
1133
1134        assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); // Min
1135        assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
1136        assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
1137        assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
1138    }
1139
1140    #[test]
1141    fn test_queue_factory_creates_correct_capacity() {
1142        let spsc = QueueFactory::create_spsc(QueueTier::Medium);
1143        assert_eq!(spsc.capacity(), 1024);
1144
1145        let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
1146        assert_eq!(mpsc.capacity(), 4096);
1147
1148        let bounded = QueueFactory::create_bounded(QueueTier::Small);
1149        assert_eq!(bounded.capacity(), 256);
1150    }
1151
1152    #[test]
1153    fn test_queue_factory_throughput_based() {
1154        let queue = QueueFactory::create_for_throughput(10000, 100);
1155        // 10000 msg/s * 100ms = 1000 msgs, needs Medium tier = 1024
1156        assert_eq!(queue.capacity(), 1024);
1157    }
1158
1159    #[test]
1160    fn test_queue_monitor_health_levels() {
1161        let monitor = QueueMonitor::default();
1162        let queue = SpscQueue::new(100); // Will round to 128
1163
1164        // Empty - healthy
1165        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1166
1167        // Fill to 60% - still healthy
1168        for _ in 0..76 {
1169            queue.try_enqueue(make_envelope()).unwrap();
1170        }
1171        assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
1172
1173        // Fill to 80% - warning
1174        for _ in 0..26 {
1175            queue.try_enqueue(make_envelope()).unwrap();
1176        }
1177        assert_eq!(monitor.check(&queue), QueueHealth::Warning);
1178
1179        // Fill to 95% - critical
1180        for _ in 0..18 {
1181            queue.try_enqueue(make_envelope()).unwrap();
1182        }
1183        assert_eq!(monitor.check(&queue), QueueHealth::Critical);
1184    }
1185
1186    #[test]
1187    fn test_queue_monitor_utilization() {
1188        let monitor = QueueMonitor::default();
1189        let queue = SpscQueue::new(100); // Will round to 128
1190
1191        assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
1192
1193        for _ in 0..64 {
1194            queue.try_enqueue(make_envelope()).unwrap();
1195        }
1196        assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
1197    }
1198
1199    #[test]
1200    fn test_queue_monitor_drop_detection() {
1201        let monitor = QueueMonitor::default();
1202        let queue = SpscQueue::new(4);
1203
1204        // Fill queue completely
1205        for _ in 0..4 {
1206            queue.try_enqueue(make_envelope()).unwrap();
1207        }
1208        assert!(!monitor.has_drops(&queue));
1209
1210        // Attempt another enqueue (should fail and record drop)
1211        let _ = queue.try_enqueue(make_envelope());
1212        assert!(monitor.has_drops(&queue));
1213        assert!(monitor.drop_rate(&queue) > 0.0);
1214    }
1215
1216    #[test]
1217    fn test_queue_monitor_upgrade_suggestion() {
1218        let monitor = QueueMonitor::default();
1219        let queue = SpscQueue::new(QueueTier::Small.capacity());
1220
1221        // Empty queue - no upgrade needed
1222        assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
1223
1224        // Fill to warning level (>= 75%)
1225        for _ in 0..200 {
1226            queue.try_enqueue(make_envelope()).unwrap();
1227        }
1228
1229        // Should suggest upgrade
1230        let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
1231        assert_eq!(suggestion, Some(QueueTier::Medium));
1232
1233        // Already at max tier - no upgrade possible
1234        let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
1235        for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
1236            large_queue.try_enqueue(make_envelope()).unwrap();
1237        }
1238        assert!(monitor
1239            .suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
1240            .is_none());
1241    }
1242
1243    #[test]
1244    fn test_queue_metrics_capture() {
1245        let queue = SpscQueue::new(QueueTier::Medium.capacity());
1246        let monitor = QueueMonitor::default();
1247
1248        // Add some messages
1249        for _ in 0..100 {
1250            queue.try_enqueue(make_envelope()).unwrap();
1251        }
1252
1253        let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
1254
1255        assert_eq!(metrics.health, QueueHealth::Healthy);
1256        assert!(metrics.utilization < 0.15);
1257        assert_eq!(metrics.stats.enqueued, 100);
1258        assert_eq!(metrics.tier, Some(QueueTier::Medium));
1259        assert!(metrics.suggested_upgrade.is_none());
1260    }
1261
1262    // ========================================================================
1263    // Partitioned Queue Tests
1264    // ========================================================================
1265
1266    #[test]
1267    fn test_partitioned_queue_creation() {
1268        let queue = PartitionedQueue::new(4, 256);
1269        assert_eq!(queue.partition_count(), 4);
1270        assert_eq!(queue.capacity_per_partition(), 256);
1271        assert_eq!(queue.total_capacity(), 1024);
1272    }
1273
1274    #[test]
1275    fn test_partitioned_queue_rounds_to_power_of_two() {
1276        let queue = PartitionedQueue::new(3, 256);
1277        assert_eq!(queue.partition_count(), 4); // Rounded up to 4
1278    }
1279
1280    #[test]
1281    fn test_partitioned_queue_routing() {
1282        let queue = PartitionedQueue::with_defaults();
1283
1284        // Same source ID should always go to same partition
1285        let partition1 = queue.partition_for(12345);
1286        let partition2 = queue.partition_for(12345);
1287        assert_eq!(partition1, partition2);
1288
1289        // Different source IDs may go to different partitions
1290        let partition_a = queue.partition_for(0);
1291        let partition_b = queue.partition_for(1);
1292        assert!(partition_a != partition_b || queue.partition_count() == 1);
1293    }
1294
1295    #[test]
1296    fn test_partitioned_queue_enqueue_dequeue() {
1297        let queue = PartitionedQueue::new(4, 64);
1298
1299        // Enqueue from different sources
1300        for source in 0..16u64 {
1301            let mut env = make_envelope();
1302            env.header.source_kernel = source;
1303            queue.try_enqueue(env).unwrap();
1304        }
1305
1306        assert_eq!(queue.total_messages(), 16);
1307
1308        // Dequeue all
1309        for _ in 0..16 {
1310            let env = queue.try_dequeue_any();
1311            assert!(env.is_some());
1312        }
1313
1314        assert_eq!(queue.total_messages(), 0);
1315        assert!(queue.try_dequeue_any().is_none());
1316    }
1317
1318    #[test]
1319    fn test_partitioned_queue_stats() {
1320        let queue = PartitionedQueue::new(4, 64);
1321
1322        // Enqueue to different partitions
1323        for source in 0..20u64 {
1324            let mut env = make_envelope();
1325            env.header.source_kernel = source;
1326            queue.try_enqueue(env).unwrap();
1327        }
1328
1329        let stats = queue.stats();
1330        assert_eq!(stats.total.enqueued, 20);
1331        assert_eq!(stats.partition_count, 4);
1332        assert_eq!(stats.partition_stats.len(), 4);
1333    }
1334
1335    #[test]
1336    fn test_partitioned_queue_load_imbalance() {
1337        let queue = PartitionedQueue::new(4, 64);
1338
1339        // All messages go to same partition (source 0 maps to partition 0)
1340        for _ in 0..10 {
1341            let mut env = make_envelope();
1342            env.header.source_kernel = 0;
1343            queue.try_enqueue(env).unwrap();
1344        }
1345
1346        let stats = queue.stats();
1347        // All 10 messages in one partition, avg = 2.5, max = 10
1348        // Imbalance = 10 / 2.5 = 4.0
1349        assert!((stats.load_imbalance() - 4.0).abs() < 0.001);
1350    }
1351
1352    #[test]
1353    fn test_partitioned_queue_dequeue_partition() {
1354        let queue = PartitionedQueue::new(4, 64);
1355
1356        // Enqueue to a specific partition (source 0)
1357        let mut env = make_envelope();
1358        env.header.source_kernel = 0;
1359        queue.try_enqueue(env).unwrap();
1360
1361        let partition = queue.partition_for(0);
1362
1363        // Dequeue from that specific partition
1364        let result = queue.try_dequeue_partition(partition);
1365        assert!(result.is_ok());
1366
1367        // Invalid partition should error
1368        let result = queue.try_dequeue_partition(100);
1369        assert!(result.is_err());
1370    }
1371}
1372
1373#[cfg(test)]
1374mod proptests {
1375    use super::*;
1376    use crate::hlc::HlcTimestamp;
1377    use crate::message::MessageHeader;
1378    use proptest::prelude::*;
1379
1380    /// Create a test envelope with a distinct message_type as sequence tag.
1381    fn make_envelope_with_seq(seq: u64) -> MessageEnvelope {
1382        MessageEnvelope {
1383            // MessageHeader::new(message_type, source_kernel, dest_kernel, payload_size, timestamp)
1384            header: MessageHeader::new(seq, 1, 0, 8, HlcTimestamp::now(1)),
1385            payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
1386            provenance: None,
1387            tenant_id: 0,
1388            audit_tag: crate::k2k::audit_tag::AuditTag::unspecified(),
1389        }
1390    }
1391
1392    proptest! {
1393        #[test]
1394        fn spsc_capacity_invariant(cap in 1usize..=256) {
1395            let queue = SpscQueue::new(cap);
1396            let actual_cap = queue.capacity();
1397            prop_assert!(actual_cap.is_power_of_two());
1398            prop_assert!(actual_cap >= cap);
1399        }
1400
1401        #[test]
1402        fn spsc_len_never_exceeds_capacity(n in 1usize..=128) {
1403            let queue = SpscQueue::new(n);
1404            let cap = queue.capacity();
1405            for i in 0..(cap + 10) {
1406                let _ = queue.try_enqueue(make_envelope_with_seq(i as u64));
1407                prop_assert!(queue.len() <= cap);
1408            }
1409        }
1410
1411        #[test]
1412        fn spsc_fifo_ordering(count in 1usize..=64) {
1413            let queue = SpscQueue::new((count + 1).next_power_of_two());
1414
1415            for i in 0..count {
1416                queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1417            }
1418
1419            // Dequeue and verify FIFO order via message_type tag
1420            for i in 0..count {
1421                let env = queue.try_dequeue().unwrap();
1422                prop_assert_eq!(env.header.message_type, i as u64);
1423            }
1424
1425            prop_assert!(queue.is_empty());
1426        }
1427
1428        #[test]
1429        fn spsc_stats_consistency(enqueue_count in 1usize..=64) {
1430            let queue = SpscQueue::new(64);
1431            let cap = queue.capacity();
1432            let mut expected_dropped = 0u64;
1433
1434            for i in 0..enqueue_count {
1435                if queue.try_enqueue(make_envelope_with_seq(i as u64)).is_err() {
1436                    expected_dropped += 1;
1437                }
1438            }
1439
1440            let stats = queue.stats();
1441            let successful = enqueue_count as u64 - expected_dropped;
1442            prop_assert_eq!(stats.enqueued, successful);
1443            prop_assert_eq!(stats.dropped, expected_dropped);
1444            prop_assert_eq!(stats.depth, successful);
1445            prop_assert!(stats.depth <= cap as u64);
1446        }
1447
1448        #[test]
1449        fn spsc_enqueue_dequeue_roundtrip(n in 1usize..=32) {
1450            let queue = SpscQueue::new(64);
1451
1452            for i in 0..n {
1453                queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
1454            }
1455
1456            for _ in 0..n {
1457                queue.try_dequeue().unwrap();
1458            }
1459
1460            let stats = queue.stats();
1461            prop_assert_eq!(stats.enqueued, n as u64);
1462            prop_assert_eq!(stats.dequeued, n as u64);
1463            prop_assert_eq!(stats.depth, 0);
1464            prop_assert!(queue.is_empty());
1465        }
1466
1467        #[test]
1468        fn partitioned_routing_deterministic(source_id in 0u64..1000, partitions in 1usize..=8) {
1469            let queue = PartitionedQueue::new(partitions, 64);
1470            let p1 = queue.partition_for(source_id);
1471            let p2 = queue.partition_for(source_id);
1472            prop_assert_eq!(p1, p2);
1473            prop_assert!(p1 < queue.partition_count());
1474        }
1475    }
1476}