Crate jacques

Crate jacques 

Source
Expand description

§Jacques: High-Performance Lock-Free MPMC Queues

Jacques is a high-performance, lock-free Multi-Producer Multi-Consumer (MPMC) queue library designed for concurrent applications requiring maximum throughput and minimal latency.

§Features

  • Lock-free algorithms: Zero mutex contention with atomic operations
  • MPMC support: Multiple producers and consumers can operate concurrently
  • Zero-allocation operation: No dynamic allocation during push/pop operations
  • Horizontal scaling: Pack-based load distribution across multiple queues
  • Type safety: Comprehensive compile-time guarantees with generic design
  • Memory efficient: Packed 128-bit atomic operations with sequence numbers
  • Rich API: Blocking, non-blocking, conditional, and bulk operations

§Queue Types

Jacques provides three main queue implementations:

§1. Owned Queue (MpmcQueue)

The foundational lock-free queue for Copy types:

use jacques::{
    owned::queue,
    traits::{QueueConsumer, QueueProducer},
};

let (producer, consumer) = queue::<u64>().capacity(1024).channels()?;

producer.push(42)?;
assert_eq!(consumer.pop()?, 42);

§2. Pointer Queue (PointerQueue)

Store non-Copy types by wrapping them in Arc<T>:

use jacques::pointer::pointer_queue;
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq)]
struct Message {
    id: u64,
    data: Vec<u8>,
}

use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = pointer_queue::<Message>().capacity(512).channels()?;

let msg = Arc::new(Message {
    id: 1,
    data: vec![1, 2, 3],
});
producer.push(msg.clone())?;
assert_eq!(consumer.pop()?, msg);

§3. Queue Pack (QueuePack)

Horizontal scaling with multiple independent queues:

use jacques::pack::queue_pack;

// 4 queues, scan every 16 operations
use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue_pack::<u64, 4, 16>().queue_capacity(256).channels()?;

producer.push(100)?;
assert_eq!(consumer.pop()?, 100);

§Performance Characteristics

  • Throughput: >100M operations/second on modern hardware
  • Latency: Sub-microsecond operation latency
  • Scalability: Linear scaling with core count using queue packs
  • Memory: Constant memory usage, no dynamic allocation

§Advanced Features

§Sequence Numbers

Track operation ordering across concurrent access:

use jacques::owned::queue;

use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<u8>().capacity(64).channels()?;

let seq = producer.push_with_seq(12)?;
let (value, pop_seq) = consumer.pop_with_seq()?;
assert_eq!(value, 12);

§Conditional Operations

Process elements based on predicates:

use jacques::owned::queue;

use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<i32>().capacity(32).channels()?;

producer.push(2)?;
producer.push(1)?;
producer.push(3)?;

// Pop the head if it's even
let even = consumer.pop_if(|&value, _seq| value % 2 == 0)?;
assert_eq!(even, 2);

§Bulk Processing

Consume multiple elements efficiently:

use jacques::owned::queue;

use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<u32>().capacity(16).channels()?;

for i in 0..5 {
    producer.push(i)?;
}

let mut sum = 0;
let count = consumer.consume(|value, _seq| {
    sum += value;
    value >= 3 // Stop after processing value 3
});

println!("Processed {} items, sum: {}", count, sum);

§Thread Safety

All queue types are Send + Sync and designed for concurrent access:

use jacques::owned::queue;
use std::thread;

use jacques::traits::{QueueConsumer, QueueProducer};
let (producer, consumer) = queue::<usize>().capacity(1024).channels()?;

// Spawn producer thread
let producer_handle = {
    let producer = producer.clone();
    thread::spawn(move || {
        for i in 0..100 {
            producer.push(i).unwrap();
        }
    })
};

// Spawn consumer thread
let consumer_handle = {
    let consumer = consumer.clone();
    thread::spawn(move || {
        let mut sum = 0;
        for _ in 0..100 {
            sum += consumer.pop().unwrap();
        }
        sum
    })
};

producer_handle.join().unwrap();
let sum = consumer_handle.join().unwrap();
println!("Sum: {}", sum);

§Memory Layout

Jacques uses a carefully designed memory layout for optimal performance:

  • 128-bit atomic operations containing both data and sequence numbers
  • Cache-padded storage to prevent false sharing
  • Power-of-two capacities for efficient modulo operations

§Error Handling

All operations return Result types with descriptive errors:

  • QueueError::Full - Queue capacity exceeded
  • QueueError::Empty - No elements available
  • QueueError::InvalidCapacity - Invalid configuration
  • QueueError::TypeSizeExceeded - Type too large for atomic storage

§Minimum Supported Rust Version (MSRV)

Jacques requires Rust 1.88 or later.

Modules§

owned
Core lock-free MPMC queue implementation for Copy types.
pack
Horizontal scaling with multiple independent queues.
pointer
Lock-free MPMC queue for non-Copy types using Arc<T> storage.
traits
Common traits for queue producers, consumers, and factories.

Enums§

QueueError
Errors that can occur during queue operations.