Expand description
§Jacques: High-Performance Lock-Free MPMC Queues
Jacques is a high-performance, lock-free Multi-Producer Multi-Consumer (MPMC) queue library designed for concurrent applications requiring maximum throughput and minimal latency.
§Features
- Lock-free algorithms: Zero mutex contention with atomic operations
- MPMC support: Multiple producers and consumers can operate concurrently
- Zero-allocation operation: No dynamic allocation during push/pop operations
- Horizontal scaling: Pack-based load distribution across multiple queues
- Type safety: Comprehensive compile-time guarantees with generic design
- Memory efficient: Packed 128-bit atomic operations with sequence numbers
- Rich API: Blocking, non-blocking, conditional, and bulk operations
§Queue Types
Jacques provides three main queue implementations:
§1. Owned Queue (MpmcQueue)
The foundational lock-free queue for Copy types:
use jacques::{
owned::queue,
traits::{QueueConsumer, QueueProducer},
};
let (producer, consumer) = queue::<u64>().capacity(1024).channels()?;
producer.push(42)?;
assert_eq!(consumer.pop()?, 42);§2. Pointer Queue (PointerQueue)
Store non-Copy types by wrapping them in Arc<T>:
use jacques::pointer::pointer_queue;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq)]
struct Message {
id: u64,
data: Vec<u8>,
}
use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = pointer_queue::<Message>().capacity(512).channels()?;
let msg = Arc::new(Message {
id: 1,
data: vec![1, 2, 3],
});
producer.push(msg.clone())?;
assert_eq!(consumer.pop()?, msg);§3. Queue Pack (QueuePack)
Horizontal scaling with multiple independent queues:
use jacques::pack::queue_pack;
// 4 queues, scan every 16 operations
use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue_pack::<u64, 4, 16>().queue_capacity(256).channels()?;
producer.push(100)?;
assert_eq!(consumer.pop()?, 100);§Performance Characteristics
- Throughput: >100M operations/second on modern hardware
- Latency: Sub-microsecond operation latency
- Scalability: Linear scaling with core count using queue packs
- Memory: Constant memory usage, no dynamic allocation
§Advanced Features
§Sequence Numbers
Track operation ordering across concurrent access:
use jacques::owned::queue;
use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<u8>().capacity(64).channels()?;
let seq = producer.push_with_seq(12)?;
let (value, pop_seq) = consumer.pop_with_seq()?;
assert_eq!(value, 12);§Conditional Operations
Process elements based on predicates:
use jacques::owned::queue;
use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<i32>().capacity(32).channels()?;
producer.push(2)?;
producer.push(1)?;
producer.push(3)?;
// Pop the head if it's even
let even = consumer.pop_if(|&value, _seq| value % 2 == 0)?;
assert_eq!(even, 2);§Bulk Processing
Consume multiple elements efficiently:
use jacques::owned::queue;
use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<u32>().capacity(16).channels()?;
for i in 0..5 {
producer.push(i)?;
}
let mut sum = 0;
let count = consumer.consume(|value, _seq| {
sum += value;
value >= 3 // Stop after processing value 3
});
println!("Processed {} items, sum: {}", count, sum);§Thread Safety
All queue types are Send + Sync and designed for concurrent access:
use jacques::owned::queue;
use std::thread;
use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<usize>().capacity(1024).channels()?;
// Spawn producer thread
let producer_handle = {
let producer = producer.clone();
thread::spawn(move || {
for i in 0..100 {
producer.push(i).unwrap();
}
})
};
// Spawn consumer thread
let consumer_handle = {
let consumer = consumer.clone();
thread::spawn(move || {
let mut sum = 0;
for _ in 0..100 {
sum += consumer.pop().unwrap();
}
sum
})
};
producer_handle.join().unwrap();
let sum = consumer_handle.join().unwrap();
println!("Sum: {}", sum);§Memory Layout
Jacques uses a carefully designed memory layout for optimal performance:
- 128-bit atomic operations containing both data and sequence numbers
- Cache-padded storage to prevent false sharing
- Power-of-two capacities for efficient modulo operations
§Error Handling
All operations return Result types with descriptive errors:
QueueError::Full- Queue capacity exceededQueueError::Empty- No elements availableQueueError::InvalidCapacity- Invalid configurationQueueError::TypeSizeExceeded- Type too large for atomic storage
§Minimum Supported Rust Version (MSRV)
Jacques requires Rust 1.88 or later.
Modules§
- owned
- Core lock-free MPMC queue implementation for
Copytypes. - pack
- Horizontal scaling with multiple independent queues.
- pointer
- Lock-free MPMC queue for non-
Copytypes usingArc<T>storage. - traits
- Common traits for queue producers, consumers, and factories.
Enums§
- Queue
Error - Errors that can occur during queue operations.