use std::sync::atomic::{AtomicU64, Ordering};
use crate::error::{Result, RingKernelError};
use crate::message::MessageEnvelope;
#[derive(Debug, Clone, Default)]
pub struct QueueStats {
pub enqueued: u64,
pub dequeued: u64,
pub dropped: u64,
pub depth: u64,
pub max_depth: u64,
}
pub trait MessageQueue: Send + Sync {
fn capacity(&self) -> usize;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn is_full(&self) -> bool {
self.len() >= self.capacity()
}
fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
fn try_dequeue(&self) -> Result<MessageEnvelope>;
fn stats(&self) -> QueueStats;
fn reset_stats(&self);
}
pub struct SpscQueue {
buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
capacity: usize,
mask: usize,
head: AtomicU64,
tail: AtomicU64,
stats: QueueStatsInner,
}
struct QueueStatsInner {
enqueued: AtomicU64,
dequeued: AtomicU64,
dropped: AtomicU64,
max_depth: AtomicU64,
}
impl SpscQueue {
pub fn new(capacity: usize) -> Self {
let capacity = capacity.next_power_of_two();
let mask = capacity - 1;
let mut buffer = Vec::with_capacity(capacity);
for _ in 0..capacity {
buffer.push(parking_lot::Mutex::new(None));
}
Self {
buffer,
capacity,
mask,
head: AtomicU64::new(0),
tail: AtomicU64::new(0),
stats: QueueStatsInner {
enqueued: AtomicU64::new(0),
dequeued: AtomicU64::new(0),
dropped: AtomicU64::new(0),
max_depth: AtomicU64::new(0),
},
}
}
#[inline]
fn depth(&self) -> u64 {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
head.wrapping_sub(tail)
}
fn update_max_depth(&self) {
let depth = self.depth();
let mut max = self.stats.max_depth.load(Ordering::Relaxed);
while depth > max {
match self.stats.max_depth.compare_exchange_weak(
max,
depth,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(current) => max = current,
}
}
}
}
impl MessageQueue for SpscQueue {
#[inline]
fn capacity(&self) -> usize {
self.capacity
}
#[inline]
fn len(&self) -> usize {
self.depth() as usize
}
#[inline]
fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= self.capacity as u64 {
self.stats.dropped.fetch_add(1, Ordering::Relaxed);
return Err(RingKernelError::QueueFull {
capacity: self.capacity,
});
}
let index = (head as usize) & self.mask;
let mut slot = self.buffer[index].lock();
*slot = Some(envelope);
drop(slot);
self.head.store(head.wrapping_add(1), Ordering::Release);
self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
self.update_max_depth();
Ok(())
}
#[inline]
fn try_dequeue(&self) -> Result<MessageEnvelope> {
let tail = self.tail.load(Ordering::Acquire);
let head = self.head.load(Ordering::Acquire);
if head == tail {
return Err(RingKernelError::QueueEmpty);
}
let index = (tail as usize) & self.mask;
let mut slot = self.buffer[index].lock();
let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
drop(slot);
self.tail.store(tail.wrapping_add(1), Ordering::Release);
self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
Ok(envelope)
}
fn stats(&self) -> QueueStats {
QueueStats {
enqueued: self.stats.enqueued.load(Ordering::Relaxed),
dequeued: self.stats.dequeued.load(Ordering::Relaxed),
dropped: self.stats.dropped.load(Ordering::Relaxed),
depth: self.depth(),
max_depth: self.stats.max_depth.load(Ordering::Relaxed),
}
}
fn reset_stats(&self) {
self.stats.enqueued.store(0, Ordering::Relaxed);
self.stats.dequeued.store(0, Ordering::Relaxed);
self.stats.dropped.store(0, Ordering::Relaxed);
self.stats.max_depth.store(0, Ordering::Relaxed);
}
}
pub struct MpscQueue {
inner: SpscQueue,
producer_lock: parking_lot::Mutex<()>,
}
impl MpscQueue {
pub fn new(capacity: usize) -> Self {
Self {
inner: SpscQueue::new(capacity),
producer_lock: parking_lot::Mutex::new(()),
}
}
}
impl MessageQueue for MpscQueue {
#[inline]
fn capacity(&self) -> usize {
self.inner.capacity()
}
#[inline]
fn len(&self) -> usize {
self.inner.len()
}
#[inline]
fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
let _guard = self.producer_lock.lock();
self.inner.try_enqueue(envelope)
}
#[inline]
fn try_dequeue(&self) -> Result<MessageEnvelope> {
self.inner.try_dequeue()
}
fn stats(&self) -> QueueStats {
self.inner.stats()
}
fn reset_stats(&self) {
self.inner.reset_stats()
}
}
pub struct BoundedQueue {
inner: MpscQueue,
not_full: parking_lot::Condvar,
not_empty: parking_lot::Condvar,
mutex: parking_lot::Mutex<()>,
}
impl BoundedQueue {
pub fn new(capacity: usize) -> Self {
Self {
inner: MpscQueue::new(capacity),
not_full: parking_lot::Condvar::new(),
not_empty: parking_lot::Condvar::new(),
mutex: parking_lot::Mutex::new(()),
}
}
pub fn enqueue_timeout(
&self,
envelope: MessageEnvelope,
timeout: std::time::Duration,
) -> Result<()> {
let deadline = std::time::Instant::now() + timeout;
loop {
match self.inner.try_enqueue(envelope.clone()) {
Ok(()) => {
self.not_empty.notify_one();
return Ok(());
}
Err(RingKernelError::QueueFull { .. }) => {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
return Err(RingKernelError::Timeout(timeout));
}
let mut guard = self.mutex.lock();
let _ = self.not_full.wait_for(&mut guard, remaining);
}
Err(e) => return Err(e),
}
}
}
pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
let deadline = std::time::Instant::now() + timeout;
loop {
match self.inner.try_dequeue() {
Ok(envelope) => {
self.not_full.notify_one();
return Ok(envelope);
}
Err(RingKernelError::QueueEmpty) => {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
return Err(RingKernelError::Timeout(timeout));
}
let mut guard = self.mutex.lock();
let _ = self.not_empty.wait_for(&mut guard, remaining);
}
Err(e) => return Err(e),
}
}
}
}
impl MessageQueue for BoundedQueue {
fn capacity(&self) -> usize {
self.inner.capacity()
}
fn len(&self) -> usize {
self.inner.len()
}
fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
let result = self.inner.try_enqueue(envelope);
if result.is_ok() {
self.not_empty.notify_one();
}
result
}
fn try_dequeue(&self) -> Result<MessageEnvelope> {
let result = self.inner.try_dequeue();
if result.is_ok() {
self.not_full.notify_one();
}
result
}
fn stats(&self) -> QueueStats {
self.inner.stats()
}
fn reset_stats(&self) {
self.inner.reset_stats()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum QueueTier {
Small,
#[default]
Medium,
Large,
ExtraLarge,
}
impl QueueTier {
pub fn capacity(&self) -> usize {
match self {
Self::Small => 256,
Self::Medium => 1024,
Self::Large => 4096,
Self::ExtraLarge => 16384,
}
}
pub fn for_throughput(messages_per_second: u64, target_headroom_ms: u64) -> Self {
let needed_capacity = (messages_per_second * target_headroom_ms) / 1000;
if needed_capacity <= 256 {
Self::Small
} else if needed_capacity <= 1024 {
Self::Medium
} else if needed_capacity <= 4096 {
Self::Large
} else {
Self::ExtraLarge
}
}
pub fn upgrade(&self) -> Self {
match self {
Self::Small => Self::Medium,
Self::Medium => Self::Large,
Self::Large => Self::ExtraLarge,
Self::ExtraLarge => Self::ExtraLarge, }
}
pub fn downgrade(&self) -> Self {
match self {
Self::Small => Self::Small, Self::Medium => Self::Small,
Self::Large => Self::Medium,
Self::ExtraLarge => Self::Large,
}
}
}
pub struct QueueFactory;
impl QueueFactory {
pub fn create_spsc(tier: QueueTier) -> SpscQueue {
SpscQueue::new(tier.capacity())
}
pub fn create_mpsc(tier: QueueTier) -> MpscQueue {
MpscQueue::new(tier.capacity())
}
pub fn create_bounded(tier: QueueTier) -> BoundedQueue {
BoundedQueue::new(tier.capacity())
}
pub fn create_for_throughput(
messages_per_second: u64,
headroom_ms: u64,
) -> Box<dyn MessageQueue> {
let tier = QueueTier::for_throughput(messages_per_second, headroom_ms);
Box::new(Self::create_mpsc(tier))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueueHealth {
Healthy,
Warning,
Critical,
}
#[derive(Debug, Clone)]
pub struct QueueMonitor {
pub warning_threshold: f64,
pub critical_threshold: f64,
}
impl Default for QueueMonitor {
fn default() -> Self {
Self {
warning_threshold: 0.75, critical_threshold: 0.90, }
}
}
impl QueueMonitor {
pub fn new(warning_threshold: f64, critical_threshold: f64) -> Self {
Self {
warning_threshold,
critical_threshold,
}
}
pub fn check(&self, queue: &dyn MessageQueue) -> QueueHealth {
let utilization = self.utilization(queue);
if utilization >= self.critical_threshold {
QueueHealth::Critical
} else if utilization >= self.warning_threshold {
QueueHealth::Warning
} else {
QueueHealth::Healthy
}
}
pub fn utilization(&self, queue: &dyn MessageQueue) -> f64 {
let capacity = queue.capacity();
if capacity == 0 {
return 0.0;
}
queue.len() as f64 / capacity as f64
}
pub fn utilization_percent(&self, queue: &dyn MessageQueue) -> f64 {
self.utilization(queue) * 100.0
}
pub fn suggest_upgrade(
&self,
queue: &dyn MessageQueue,
current_tier: QueueTier,
) -> Option<QueueTier> {
let stats = queue.stats();
let utilization = self.utilization(queue);
let max_util = if queue.capacity() > 0 {
stats.max_depth as f64 / queue.capacity() as f64
} else {
0.0
};
if utilization >= self.warning_threshold || max_util >= self.critical_threshold {
let upgraded = current_tier.upgrade();
if upgraded != current_tier {
return Some(upgraded);
}
}
None
}
pub fn has_drops(&self, queue: &dyn MessageQueue) -> bool {
queue.stats().dropped > 0
}
pub fn drop_rate(&self, queue: &dyn MessageQueue) -> f64 {
let stats = queue.stats();
let total_attempted = stats.enqueued + stats.dropped;
if total_attempted == 0 {
return 0.0;
}
stats.dropped as f64 / total_attempted as f64
}
}
#[derive(Debug, Clone)]
pub struct QueueMetrics {
pub health: QueueHealth,
pub utilization: f64,
pub stats: QueueStats,
pub tier: Option<QueueTier>,
pub suggested_upgrade: Option<QueueTier>,
}
impl QueueMetrics {
pub fn capture(
queue: &dyn MessageQueue,
monitor: &QueueMonitor,
current_tier: Option<QueueTier>,
) -> Self {
let health = monitor.check(queue);
let utilization = monitor.utilization(queue);
let stats = queue.stats();
let suggested_upgrade = current_tier.and_then(|tier| monitor.suggest_upgrade(queue, tier));
Self {
health,
utilization,
stats,
tier: current_tier,
suggested_upgrade,
}
}
}
pub struct PartitionedQueue {
partitions: Vec<SpscQueue>,
partition_count: usize,
dequeue_index: AtomicU64,
}
impl PartitionedQueue {
pub fn new(partition_count: usize, capacity_per_partition: usize) -> Self {
let partition_count = partition_count.max(1).next_power_of_two();
let partitions = (0..partition_count)
.map(|_| SpscQueue::new(capacity_per_partition))
.collect();
Self {
partitions,
partition_count,
dequeue_index: AtomicU64::new(0),
}
}
pub fn with_defaults() -> Self {
Self::new(4, QueueTier::Medium.capacity())
}
pub fn for_high_contention() -> Self {
Self::new(8, QueueTier::Large.capacity())
}
#[inline]
pub fn partition_for(&self, source_id: u64) -> usize {
(source_id as usize) & (self.partition_count - 1)
}
pub fn partition_count(&self) -> usize {
self.partition_count
}
pub fn capacity_per_partition(&self) -> usize {
self.partitions.first().map_or(0, |p| p.capacity())
}
pub fn total_capacity(&self) -> usize {
self.capacity_per_partition() * self.partition_count
}
pub fn total_messages(&self) -> usize {
self.partitions.iter().map(|p| p.len()).sum()
}
pub fn try_enqueue_from(&self, source_id: u64, envelope: MessageEnvelope) -> Result<()> {
let partition = self.partition_for(source_id);
self.partitions[partition].try_enqueue(envelope)
}
pub fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
let source_id = envelope.header.source_kernel;
self.try_enqueue_from(source_id, envelope)
}
pub fn try_dequeue_partition(&self, partition: usize) -> Result<MessageEnvelope> {
if partition >= self.partition_count {
return Err(RingKernelError::InvalidConfig(format!(
"Invalid partition index: {} (max: {})",
partition,
self.partition_count - 1
)));
}
self.partitions[partition].try_dequeue()
}
pub fn try_dequeue_any(&self) -> Option<MessageEnvelope> {
let start_index = self.dequeue_index.fetch_add(1, Ordering::Relaxed) as usize;
for i in 0..self.partition_count {
let partition = (start_index + i) & (self.partition_count - 1);
if let Ok(envelope) = self.partitions[partition].try_dequeue() {
return Some(envelope);
}
}
None
}
pub fn partition_stats(&self, partition: usize) -> Option<QueueStats> {
self.partitions.get(partition).map(|p| p.stats())
}
pub fn stats(&self) -> PartitionedQueueStats {
let mut total = QueueStats::default();
let mut partition_stats = Vec::with_capacity(self.partition_count);
for partition in &self.partitions {
let stats = partition.stats();
total.enqueued += stats.enqueued;
total.dequeued += stats.dequeued;
total.dropped += stats.dropped;
total.depth += stats.depth;
if stats.max_depth > total.max_depth {
total.max_depth = stats.max_depth;
}
partition_stats.push(stats);
}
PartitionedQueueStats {
total,
partition_stats,
partition_count: self.partition_count,
}
}
pub fn reset_stats(&self) {
for partition in &self.partitions {
partition.reset_stats();
}
}
}
#[derive(Debug, Clone)]
pub struct PartitionedQueueStats {
pub total: QueueStats,
pub partition_stats: Vec<QueueStats>,
pub partition_count: usize,
}
impl PartitionedQueueStats {
pub fn load_imbalance(&self) -> f64 {
if self.partition_count == 0 {
return 1.0;
}
let avg = self.total.depth as f64 / self.partition_count as f64;
if avg == 0.0 {
return 1.0;
}
let max = self
.partition_stats
.iter()
.map(|s| s.depth)
.max()
.unwrap_or(0);
max as f64 / avg
}
pub fn max_partition_utilization(&self, capacity_per_partition: usize) -> f64 {
if capacity_per_partition == 0 {
return 0.0;
}
let max = self
.partition_stats
.iter()
.map(|s| s.depth)
.max()
.unwrap_or(0);
max as f64 / capacity_per_partition as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hlc::HlcTimestamp;
use crate::message::MessageHeader;
fn make_envelope() -> MessageEnvelope {
MessageEnvelope {
header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
}
}
#[test]
fn test_spsc_basic() {
let queue = SpscQueue::new(16);
assert!(queue.is_empty());
assert!(!queue.is_full());
let env = make_envelope();
queue.try_enqueue(env).unwrap();
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
let _ = queue.try_dequeue().unwrap();
assert!(queue.is_empty());
}
#[test]
fn test_spsc_full() {
let queue = SpscQueue::new(4);
for _ in 0..4 {
queue.try_enqueue(make_envelope()).unwrap();
}
assert!(queue.is_full());
assert!(matches!(
queue.try_enqueue(make_envelope()),
Err(RingKernelError::QueueFull { .. })
));
}
#[test]
fn test_spsc_stats() {
let queue = SpscQueue::new(16);
for _ in 0..10 {
queue.try_enqueue(make_envelope()).unwrap();
}
for _ in 0..5 {
let _ = queue.try_dequeue().unwrap();
}
let stats = queue.stats();
assert_eq!(stats.enqueued, 10);
assert_eq!(stats.dequeued, 5);
assert_eq!(stats.depth, 5);
}
#[test]
fn test_mpsc_concurrent() {
use std::sync::Arc;
use std::thread;
let queue = Arc::new(MpscQueue::new(1024));
let mut handles = vec![];
for _ in 0..4 {
let q = Arc::clone(&queue);
handles.push(thread::spawn(move || {
for _ in 0..100 {
q.try_enqueue(make_envelope()).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
let stats = queue.stats();
assert_eq!(stats.enqueued, 400);
}
#[test]
fn test_bounded_timeout() {
let queue = BoundedQueue::new(2);
queue.try_enqueue(make_envelope()).unwrap();
queue.try_enqueue(make_envelope()).unwrap();
let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
assert!(matches!(result, Err(RingKernelError::Timeout(_))));
}
#[test]
fn test_queue_tier_capacities() {
assert_eq!(QueueTier::Small.capacity(), 256);
assert_eq!(QueueTier::Medium.capacity(), 1024);
assert_eq!(QueueTier::Large.capacity(), 4096);
assert_eq!(QueueTier::ExtraLarge.capacity(), 16384);
}
#[test]
fn test_queue_tier_for_throughput() {
assert_eq!(QueueTier::for_throughput(1000, 100), QueueTier::Small);
assert_eq!(QueueTier::for_throughput(5000, 100), QueueTier::Medium);
assert_eq!(QueueTier::for_throughput(20000, 100), QueueTier::Large);
assert_eq!(
QueueTier::for_throughput(100000, 100),
QueueTier::ExtraLarge
);
}
#[test]
fn test_queue_tier_upgrade_downgrade() {
assert_eq!(QueueTier::Small.upgrade(), QueueTier::Medium);
assert_eq!(QueueTier::Medium.upgrade(), QueueTier::Large);
assert_eq!(QueueTier::Large.upgrade(), QueueTier::ExtraLarge);
assert_eq!(QueueTier::ExtraLarge.upgrade(), QueueTier::ExtraLarge);
assert_eq!(QueueTier::Small.downgrade(), QueueTier::Small); assert_eq!(QueueTier::Medium.downgrade(), QueueTier::Small);
assert_eq!(QueueTier::Large.downgrade(), QueueTier::Medium);
assert_eq!(QueueTier::ExtraLarge.downgrade(), QueueTier::Large);
}
#[test]
fn test_queue_factory_creates_correct_capacity() {
let spsc = QueueFactory::create_spsc(QueueTier::Medium);
assert_eq!(spsc.capacity(), 1024);
let mpsc = QueueFactory::create_mpsc(QueueTier::Large);
assert_eq!(mpsc.capacity(), 4096);
let bounded = QueueFactory::create_bounded(QueueTier::Small);
assert_eq!(bounded.capacity(), 256);
}
#[test]
fn test_queue_factory_throughput_based() {
let queue = QueueFactory::create_for_throughput(10000, 100);
assert_eq!(queue.capacity(), 1024);
}
#[test]
fn test_queue_monitor_health_levels() {
let monitor = QueueMonitor::default();
let queue = SpscQueue::new(100);
assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
for _ in 0..76 {
queue.try_enqueue(make_envelope()).unwrap();
}
assert_eq!(monitor.check(&queue), QueueHealth::Healthy);
for _ in 0..26 {
queue.try_enqueue(make_envelope()).unwrap();
}
assert_eq!(monitor.check(&queue), QueueHealth::Warning);
for _ in 0..18 {
queue.try_enqueue(make_envelope()).unwrap();
}
assert_eq!(monitor.check(&queue), QueueHealth::Critical);
}
#[test]
fn test_queue_monitor_utilization() {
let monitor = QueueMonitor::default();
let queue = SpscQueue::new(100);
assert!((monitor.utilization(&queue) - 0.0).abs() < 0.001);
for _ in 0..64 {
queue.try_enqueue(make_envelope()).unwrap();
}
assert!((monitor.utilization(&queue) - 0.5).abs() < 0.001);
}
#[test]
fn test_queue_monitor_drop_detection() {
let monitor = QueueMonitor::default();
let queue = SpscQueue::new(4);
for _ in 0..4 {
queue.try_enqueue(make_envelope()).unwrap();
}
assert!(!monitor.has_drops(&queue));
let _ = queue.try_enqueue(make_envelope());
assert!(monitor.has_drops(&queue));
assert!(monitor.drop_rate(&queue) > 0.0);
}
#[test]
fn test_queue_monitor_upgrade_suggestion() {
let monitor = QueueMonitor::default();
let queue = SpscQueue::new(QueueTier::Small.capacity());
assert!(monitor.suggest_upgrade(&queue, QueueTier::Small).is_none());
for _ in 0..200 {
queue.try_enqueue(make_envelope()).unwrap();
}
let suggestion = monitor.suggest_upgrade(&queue, QueueTier::Small);
assert_eq!(suggestion, Some(QueueTier::Medium));
let large_queue = SpscQueue::new(QueueTier::ExtraLarge.capacity());
for _ in 0..(QueueTier::ExtraLarge.capacity() * 3 / 4) {
large_queue.try_enqueue(make_envelope()).unwrap();
}
assert!(monitor
.suggest_upgrade(&large_queue, QueueTier::ExtraLarge)
.is_none());
}
#[test]
fn test_queue_metrics_capture() {
let queue = SpscQueue::new(QueueTier::Medium.capacity());
let monitor = QueueMonitor::default();
for _ in 0..100 {
queue.try_enqueue(make_envelope()).unwrap();
}
let metrics = QueueMetrics::capture(&queue, &monitor, Some(QueueTier::Medium));
assert_eq!(metrics.health, QueueHealth::Healthy);
assert!(metrics.utilization < 0.15);
assert_eq!(metrics.stats.enqueued, 100);
assert_eq!(metrics.tier, Some(QueueTier::Medium));
assert!(metrics.suggested_upgrade.is_none());
}
#[test]
fn test_partitioned_queue_creation() {
let queue = PartitionedQueue::new(4, 256);
assert_eq!(queue.partition_count(), 4);
assert_eq!(queue.capacity_per_partition(), 256);
assert_eq!(queue.total_capacity(), 1024);
}
#[test]
fn test_partitioned_queue_rounds_to_power_of_two() {
let queue = PartitionedQueue::new(3, 256);
assert_eq!(queue.partition_count(), 4); }
#[test]
fn test_partitioned_queue_routing() {
let queue = PartitionedQueue::with_defaults();
let partition1 = queue.partition_for(12345);
let partition2 = queue.partition_for(12345);
assert_eq!(partition1, partition2);
let partition_a = queue.partition_for(0);
let partition_b = queue.partition_for(1);
assert!(partition_a != partition_b || queue.partition_count() == 1);
}
#[test]
fn test_partitioned_queue_enqueue_dequeue() {
let queue = PartitionedQueue::new(4, 64);
for source in 0..16u64 {
let mut env = make_envelope();
env.header.source_kernel = source;
queue.try_enqueue(env).unwrap();
}
assert_eq!(queue.total_messages(), 16);
for _ in 0..16 {
let env = queue.try_dequeue_any();
assert!(env.is_some());
}
assert_eq!(queue.total_messages(), 0);
assert!(queue.try_dequeue_any().is_none());
}
#[test]
fn test_partitioned_queue_stats() {
let queue = PartitionedQueue::new(4, 64);
for source in 0..20u64 {
let mut env = make_envelope();
env.header.source_kernel = source;
queue.try_enqueue(env).unwrap();
}
let stats = queue.stats();
assert_eq!(stats.total.enqueued, 20);
assert_eq!(stats.partition_count, 4);
assert_eq!(stats.partition_stats.len(), 4);
}
#[test]
fn test_partitioned_queue_load_imbalance() {
let queue = PartitionedQueue::new(4, 64);
for _ in 0..10 {
let mut env = make_envelope();
env.header.source_kernel = 0;
queue.try_enqueue(env).unwrap();
}
let stats = queue.stats();
assert!((stats.load_imbalance() - 4.0).abs() < 0.001);
}
#[test]
fn test_partitioned_queue_dequeue_partition() {
let queue = PartitionedQueue::new(4, 64);
let mut env = make_envelope();
env.header.source_kernel = 0;
queue.try_enqueue(env).unwrap();
let partition = queue.partition_for(0);
let result = queue.try_dequeue_partition(partition);
assert!(result.is_ok());
let result = queue.try_dequeue_partition(100);
assert!(result.is_err());
}
}
#[cfg(test)]
mod proptests {
use super::*;
use crate::hlc::HlcTimestamp;
use crate::message::MessageHeader;
use proptest::prelude::*;
fn make_envelope_with_seq(seq: u64) -> MessageEnvelope {
MessageEnvelope {
header: MessageHeader::new(seq, 1, 0, 8, HlcTimestamp::now(1)),
payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
}
}
proptest! {
#[test]
fn spsc_capacity_invariant(cap in 1usize..=256) {
let queue = SpscQueue::new(cap);
let actual_cap = queue.capacity();
prop_assert!(actual_cap.is_power_of_two());
prop_assert!(actual_cap >= cap);
}
#[test]
fn spsc_len_never_exceeds_capacity(n in 1usize..=128) {
let queue = SpscQueue::new(n);
let cap = queue.capacity();
for i in 0..(cap + 10) {
let _ = queue.try_enqueue(make_envelope_with_seq(i as u64));
prop_assert!(queue.len() <= cap);
}
}
#[test]
fn spsc_fifo_ordering(count in 1usize..=64) {
let queue = SpscQueue::new((count + 1).next_power_of_two());
for i in 0..count {
queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
}
for i in 0..count {
let env = queue.try_dequeue().unwrap();
prop_assert_eq!(env.header.message_type, i as u64);
}
prop_assert!(queue.is_empty());
}
#[test]
fn spsc_stats_consistency(enqueue_count in 1usize..=64) {
let queue = SpscQueue::new(64);
let cap = queue.capacity();
let mut expected_dropped = 0u64;
for i in 0..enqueue_count {
if queue.try_enqueue(make_envelope_with_seq(i as u64)).is_err() {
expected_dropped += 1;
}
}
let stats = queue.stats();
let successful = enqueue_count as u64 - expected_dropped;
prop_assert_eq!(stats.enqueued, successful);
prop_assert_eq!(stats.dropped, expected_dropped);
prop_assert_eq!(stats.depth, successful);
prop_assert!(stats.depth <= cap as u64);
}
#[test]
fn spsc_enqueue_dequeue_roundtrip(n in 1usize..=32) {
let queue = SpscQueue::new(64);
for i in 0..n {
queue.try_enqueue(make_envelope_with_seq(i as u64)).unwrap();
}
for _ in 0..n {
queue.try_dequeue().unwrap();
}
let stats = queue.stats();
prop_assert_eq!(stats.enqueued, n as u64);
prop_assert_eq!(stats.dequeued, n as u64);
prop_assert_eq!(stats.depth, 0);
prop_assert!(queue.is_empty());
}
#[test]
fn partitioned_routing_deterministic(source_id in 0u64..1000, partitions in 1usize..=8) {
let queue = PartitionedQueue::new(partitions, 64);
let p1 = queue.partition_for(source_id);
let p2 = queue.partition_for(source_id);
prop_assert_eq!(p1, p2);
prop_assert!(p1 < queue.partition_count());
}
}
}