# elasticq
A thread-safe, dynamically resizable circular buffer (queue) for Rust, designed for high-throughput scenarios. Now featuring both **lock-based** and **lock-free** implementations optimized for different use cases, plus advanced features like **priority queues**, **async streams**, **persistence**, and **metrics**.
## Features
### Core Features
* **Elastic Sizing:** Automatically grows when full and shrinks when underutilized, within configurable limits.
* **Thread-Safe:** Safe for concurrent use by multiple producers and consumers.
* **Batch Operations:** Efficient `push_batch` and `pop_batch` methods for high-throughput.
* **Asynchronous API (Optional):** Enable the `async` feature for `tokio`-based asynchronous methods.
* **Configurable Behavior:** Fine-tune capacities, growth/shrink factors, and memory management.
* **Clear Error Handling:** Provides distinct error types for conditions like buffer full/empty or timeouts.
### Implementation Variants
#### ๐ **Lock-Based Implementation** (Default)
* Uses `parking_lot` mutexes for synchronous operations
* Optionally uses `tokio::sync` mutexes for asynchronous operations via the `async` feature
* Excellent for general-purpose use with moderate concurrency
* Predictable performance characteristics
#### ๐ **Lock-Free Implementation**
* **Zero-mutex MPSC queue** using atomic operations and epoch-based reclamation
* **2.1x faster** than lock-based implementation in single-threaded scenarios
* **46M+ messages/sec** throughput in optimized configurations
* **Wait-free consumer operations** - no blocking or deadlocks possible
* **Generation-based ABA protection** for safe concurrent operations
* **Consumer-driven dynamic resizing** optimized for MQTT proxy use cases
* Enable with the `lock_free` feature flag
### Advanced Features (New in v0.3.0!)
#### ๐ฏ **Priority Queue** (`priority` feature)
* Multiple priority levels (default: 3 for MQTT QoS compatibility)
* Configurable fair queuing to prevent starvation
* Per-priority statistics and capacity management
* Ideal for QoS-based message processing
#### ๐ **Async Streams** (`streams` feature)
* `Stream` and `Sink` trait implementations
* `BufferChannel` for channel-like send/recv API
* Integration with `tokio-stream` and `futures` ecosystem
* Backpressure-aware streaming
#### ๐พ **Persistence** (`persistent` feature)
* Crash recovery with write-ahead logging
* Memory-mapped file backing for efficiency
* Configurable sync modes: `NoSync`, `Periodic`, `EveryWrite`
* Automatic compaction support
#### ๐ **Metrics** (`metrics` feature)
* Integration with the `metrics` crate (Prometheus-compatible)
* Counters, gauges, and histograms for all operations
* Queue depth, capacity, utilization, and latency metrics
* Instrumented buffer wrappers for automatic recording
## Table of Contents
1. [Installation](#installation)
2. [Quick Start](#quick-start)
* [Lock-Based Usage](#lock-based-usage-default)
* [Lock-Free Usage](#lock-free-usage-mpsc)
* [Asynchronous Usage](#asynchronous-usage)
* [Priority Queue](#priority-queue-usage)
* [Async Streams](#async-streams-usage)
* [Persistence](#persistence-usage)
* [Metrics](#metrics-usage)
3. [Configuration](#configuration)
4. [API Reference](#api-reference)
5. [Performance Analysis](#performance-analysis)
* [Lock-Free vs Lock-Based Comparison](#lock-free-vs-lock-based-comparison)
* [Scalability Characteristics](#scalability-characteristics)
* [MQTT Proxy Benchmarks](#mqtt-proxy-benchmarks)
6. [Formal Verification](#formal-verification)
7. [Use Cases & Recommendations](#use-cases--recommendations)
8. [Contributing](#contributing)
9. [License](#license)
## Installation
### Basic Installation (Lock-Based)
```toml
[dependencies]
elasticq = "0.3.0"
```
### Lock-Free Implementation
```toml
[dependencies]
elasticq = { version = "0.3.0", features = ["lock_free"] }
```
### With Async Support
```toml
[dependencies]
elasticq = { version = "0.3.0", features = ["async"] }
tokio = { version = "1", features = ["sync", "time"] }
```
### Priority Queue
```toml
[dependencies]
elasticq = { version = "0.3.0", features = ["priority"] }
```
### Async Streams
```toml
[dependencies]
elasticq = { version = "0.3.0", features = ["streams"] }
tokio = { version = "1", features = ["sync", "time", "rt"] }
```
### Persistence
```toml
[dependencies]
elasticq = { version = "0.3.0", features = ["persistent"] }
```
### Metrics/Observability
```toml
[dependencies]
elasticq = { version = "0.3.0", features = ["metrics"] }
```
### All Features
```toml
[dependencies]
elasticq = { version = "0.3.0", features = ["async", "lock_free", "priority", "streams", "persistent", "metrics"] }
tokio = { version = "1", features = ["sync", "time", "rt"] }
```
## Quick Start
### Lock-Based Usage (Default)
```rust
use elasticq::{DynamicCircularBuffer, Config, BufferError};
fn main() -> Result<(), BufferError> {
// Create buffer with default configuration
let buffer = DynamicCircularBuffer::<i32>::new(Config::default())?;
// Push some items
buffer.push(10)?;
buffer.push(20)?;
println!("Buffer length: {}", buffer.len()); // Output: 2
// Pop an item
let item = buffer.pop()?;
assert_eq!(item, 10);
println!("Popped: {}", item);
// Batch operations for higher throughput
buffer.push_batch(vec![30, 40, 50])?;
let items = buffer.pop_batch(2)?;
assert_eq!(items, vec![20, 30]);
Ok(())
}
```
### Lock-Free Usage (MPSC)
Perfect for MQTT proxy scenarios with multiple publishers and a single message processor:
```rust
use elasticq::{LockFreeMPSCQueue, Config, BufferError};
use std::sync::Arc;
use std::thread;
fn main() -> Result<(), BufferError> {
// Configure for MQTT proxy use case
let config = Config::default()
.with_initial_capacity(1024)
.with_max_capacity(1048576); // 1M messages max
let queue = Arc::new(LockFreeMPSCQueue::new(config)?);
// Multiple producers (MQTT publishers)
let mut producers = vec![];
for producer_id in 0..4 {
let queue_clone = Arc::clone(&queue);
let handle = thread::spawn(move || {
for i in 0..1000 {
let message = format!("msg_{}_{}", producer_id, i);
// Non-blocking enqueue with retry
while queue_clone.try_enqueue(message.clone()).is_err() {
thread::yield_now();
}
}
});
producers.push(handle);
}
// Single consumer (MQTT message processor)
let queue_clone = Arc::clone(&queue);
let consumer = thread::spawn(move || {
let mut processed = 0;
while processed < 4000 {
match queue_clone.try_dequeue() {
Ok(Some(message)) => {
// Process message
println!("Processing: {}", message);
processed += 1;
}
Ok(None) => thread::yield_now(), // Queue empty, yield
Err(_) => thread::yield_now(), // Resize in progress
}
}
});
// Wait for completion
for handle in producers {
handle.join().unwrap();
}
consumer.join().unwrap();
// Check statistics
let stats = queue.stats();
println!("Final stats: {:?}", stats);
Ok(())
}
```
### Asynchronous Usage
Make sure you have enabled the `async` feature and have `tokio` as a dependency.
```rust
use elasticq::{DynamicCircularBuffer, Config, BufferError};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), BufferError> {
let buffer = DynamicCircularBuffer::<String>::new(Config::default())?;
// Asynchronously push
buffer.push_async("hello".to_string()).await?;
buffer.push_async("world".to_string()).await?;
// Asynchronously pop with timeout
match buffer.pop_async_timeout(Duration::from_millis(100)).await {
Ok(item) => println!("Popped async: {}", item), // Expected: "hello"
Err(BufferError::Timeout(_)) => println!("Pop operation timed out"),
Err(e) => return Err(e),
}
// Async batch operations
let messages = vec!["batch1".to_string(), "batch2".to_string()];
buffer.push_batch_async(messages).await?;
let popped_batch = buffer.pop_batch_async(2).await?;
println!("Popped async batch: {:?}", popped_batch); // Expected: ["world", "batch1"]
// Attempt to pop from an empty buffer, which should return BufferError::Empty quickly
// (pop_async and pop_batch_async don't wait if buffer is empty)
match buffer.pop_batch_async_timeout(2, Duration::from_secs(1)).await {
Ok(items) if items.is_empty() => println!("Popped empty batch as expected after draining."),
// Ok(items) => println!("Unexpectedly popped items: {:?}", items), // This case might not be hit if Empty is preferred
Err(BufferError::Empty) => println!("Buffer empty as expected."),
Err(e) => return Err(e),
}
Ok(())
}
```
### Priority Queue Usage
Perfect for MQTT QoS handling where messages have different priority levels:
```rust
use elasticq::priority::{PriorityCircularBuffer, PriorityConfig};
use elasticq::BufferError;
fn main() -> Result<(), BufferError> {
// Create a priority queue with 3 levels (matching MQTT QoS 0, 1, 2)
let config = PriorityConfig::default()
.with_priority_levels(3)
.with_fair_queuing(true) // Prevent low-priority starvation
.with_max_consecutive_per_priority(5); // Process max 5 messages per priority before switching
let queue = PriorityCircularBuffer::<String>::new(config)?;
// Push messages with different priorities
queue.push_with_priority("QoS 0 message".to_string(), 0)?; // Low priority
queue.push_with_priority("QoS 1 message".to_string(), 1)?; // Medium priority
queue.push_with_priority("QoS 2 message".to_string(), 2)?; // High priority (exactly once)
// Pop returns highest priority first
assert_eq!(queue.pop()?, "QoS 2 message".to_string());
assert_eq!(queue.pop()?, "QoS 1 message".to_string());
assert_eq!(queue.pop()?, "QoS 0 message".to_string());
// Check per-priority statistics
let stats = queue.stats();
println!("Priority stats: {:?}", stats);
Ok(())
}
```
### Async Streams Usage
Integrate with the async Rust ecosystem using `Stream` and `Sink` traits:
```rust
use elasticq::{DynamicCircularBuffer, Config};
use elasticq::streams::{BufferStream, BufferSink, BufferChannel, BufferStreamExt};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() {
let buffer = Arc::new(DynamicCircularBuffer::<i32>::new(Config::default()).unwrap());
// Option 1: Use BufferChannel for channel-like API
let channel = BufferChannel::new(buffer.clone());
// Spawn a producer
let sender = channel.clone();
tokio::spawn(async move {
for i in 0..10 {
sender.send(i).await.unwrap();
}
});
// Consume messages
for _ in 0..10 {
let msg = channel.recv_timeout(Duration::from_secs(1)).await.unwrap();
println!("Received: {}", msg);
}
// Option 2: Use Stream/Sink pair with shared notify
let buffer2 = Arc::new(DynamicCircularBuffer::<i32>::new(Config::default()).unwrap());
let (stream, sink) = buffer2.stream_sink_pair();
// The sink notifies the stream when items are pushed
sink.send(42).await.unwrap();
sink.send_batch(vec![1, 2, 3]).await.unwrap();
}
```
### Persistence Usage
Enable crash recovery with write-ahead logging:
```rust
use elasticq::persistent::{PersistentCircularBuffer, PersistentConfig, SyncMode};
use elasticq::BufferError;
use std::path::Path;
fn main() -> Result<(), BufferError> {
let config = PersistentConfig::default()
.with_file_path("/tmp/queue.dat")
.with_sync_mode(SyncMode::Periodic(std::time::Duration::from_secs(1)))
.with_max_log_entries(10000);
// Create persistent buffer (recovers data if file exists)
let buffer = PersistentCircularBuffer::<String>::new(config)?;
// Push messages (persisted to disk)
buffer.push("message 1".to_string())?;
buffer.push("message 2".to_string())?;
// Pop messages
let msg = buffer.pop()?;
println!("Got: {}", msg);
// Force sync to disk
buffer.sync()?;
// Compact the log file (removes processed entries)
buffer.compact()?;
// Check persistence stats
let stats = buffer.stats();
println!("Persistence stats: {:?}", stats);
Ok(())
}
```
### Metrics Usage
Monitor your queues with Prometheus-compatible metrics:
```rust
use elasticq::{DynamicCircularBuffer, Config};
use elasticq::metrics::{MetricsRecorder, InstrumentedBuffer};
use std::sync::Arc;
fn main() {
// Create a metrics recorder
let recorder = MetricsRecorder::new("mqtt_broker");
// Create an instrumented buffer
let buffer = Arc::new(DynamicCircularBuffer::<String>::new(Config::default()).unwrap());
let instrumented = InstrumentedBuffer::new(buffer, recorder);
// All operations are automatically recorded
instrumented.push("message".to_string()).unwrap();
let _ = instrumented.pop().unwrap();
// Or wrap an existing buffer reference
let buffer2 = DynamicCircularBuffer::<i32>::new(Config::default()).unwrap();
let recorder2 = MetricsRecorder::new("events");
let instrumented_ref = recorder2.instrument(&buffer2);
instrumented_ref.push(42).unwrap();
// Metrics are exported via the metrics crate facade
// Use metrics-exporter-prometheus or similar to expose them
}
```
## Configuration
The buffer's behavior can be customized using the `Config` struct:
```rust
use elasticq::Config; // Note: If Config is public, it's from elasticq directly.
// If it's meant to be constructed differently, adjust this example.
use std::time::Duration;
let config = Config::default()
.with_initial_capacity(512) // Initial number of elements the buffer can hold
.with_min_capacity(256) // Minimum capacity the buffer will shrink to
.with_max_capacity(8192) // Maximum capacity the buffer will grow to
.with_growth_factor(1.5) // Factor by which capacity increases (e.g., 1.5 = 50% increase)
.with_shrink_threshold(0.3) // Shrink if usage is <= 30% of current capacity
.with_pop_timeout(Duration::from_secs(5)) // Default pop timeout (currently not auto-used by methods)
.with_push_timeout(Duration::from_secs(5)); // Default push timeout (currently not auto-used by methods)
// Important: Ensure config is valid before creating the buffer!
// `DynamicCircularBuffer::new(config)` will validate it and return `Err(BufferError::InvalidConfiguration)` if not.
// Key rules:
// - initial_capacity must be between min_capacity and max_capacity.
// - min_capacity cannot be greater than max_capacity.
// - Capacities must be > 0.
// - growth_factor must be > 1.0.
// - shrink_threshold must be between 0.0 and 1.0 (exclusive).
```
The `push_timeout` and `pop_timeout` fields in `Config` are placeholders for potential future enhancements; currently, timeout methods require an explicit `Duration` argument.
## API Highlights
### Core Buffer (`DynamicCircularBuffer<T>`)
* `new(config: Config) -> Result<Self, BufferError>`: Creates a new buffer.
* `push(&self, item: T) -> Result<(), BufferError>`
* `pop(&self) -> Result<T, BufferError>`
* `push_batch(&self, items: Vec<T>) -> Result<(), BufferError>`
* `pop_batch(&self, max_items: usize) -> Result<Vec<T>, BufferError>`
* Async variants (if `async` feature enabled): `push_async`, `pop_async`, `push_batch_async`, `pop_batch_async`, and `*_timeout` versions.
* Utilities: `len()`, `is_empty()`, `capacity()`, `clear()`, `iter() -> Vec<T> (clones items)`, `drain() -> Vec<T> (consumes items)`.
### Priority Queue (`priority` feature)
* `PriorityCircularBuffer<T>`: Multi-level priority queue
* `PriorityConfig`: Configuration with `with_priority_levels()`, `with_fair_queuing()`, `with_max_consecutive_per_priority()`
* `push_with_priority(&self, item: T, priority: usize)`: Push with specific priority
* `pop(&self)`: Pop highest priority item (with fair queuing if enabled)
* `pop_from_priority(&self, priority: usize)`: Pop from specific priority level
* `stats(&self) -> PriorityStats`: Per-priority statistics
### Async Streams (`streams` feature)
* `BufferStream<T>`: Implements `futures_core::Stream`
* `BufferSink<T>`: For sending items with `send()` and `send_batch()`
* `BufferChannel<T>`: Channel-like API with `send()`, `recv()`, `recv_timeout()`
* `BufferStreamExt`: Extension trait adding `stream_sink_pair()` to buffers
### Persistence (`persistent` feature)
* `PersistentCircularBuffer<T>`: File-backed buffer with crash recovery
* `PersistentConfig`: Configuration with `with_file_path()`, `with_sync_mode()`, `with_max_log_entries()`
* `SyncMode`: `NoSync`, `Periodic(Duration)`, `EveryWrite`
* `sync(&self)`: Force sync to disk
* `compact(&self)`: Compact the write-ahead log
* `stats(&self) -> PersistentStats`: Persistence statistics
### Metrics (`metrics` feature)
* `MetricsRecorder`: Records queue metrics with configurable queue name
* `InstrumentedBuffer<T>`: Wrapper that auto-records all operations
* `InstrumentedBufferRef<T>`: Wrapper for borrowed buffer references
* `instrument(&self, buffer: &B)`: Wrap a buffer for metrics recording
* Metrics: `messages_enqueued`, `messages_dequeued`, `queue_depth`, `queue_capacity`, `operation_duration_seconds`
## Performance Analysis
Performance benchmarks were conducted on a Mac Studio with M1 Ultra (20 CPU cores). Results demonstrate significant improvements with the lock-free implementation.
### Lock-Free vs Lock-Based Comparison
| **Lock-Free** | **46.6M msg/sec** | Varies | Wait-free operations, no deadlocks |
| **Lock-Based** | **22.0M msg/sec** | Stable | Predictable under high contention |
| **Speedup** | **๐ 2.1x** | Scenario-dependent | Lock-free wins for MPSC patterns |
### Producer Scalability Analysis
Recent benchmarks (1-20 producers, single consumer) demonstrate excellent scalability characteristics:
```
Throughput (K msg/sec)
500K โค
โ
450K โค โโโ
โ โ โ
400K โค โ โ
โ โ โ
350K โค โ โโโ
โ โ โ
300K โค โ โโ
โ โ โโ
250K โค โ โโโ
โ โ
200K โค โโโโ
โ โ
150K โค
โ โ
100K โค
โ
50K โค
โ
0 โโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโ
1 3 5 7 9 11 13 15 17 19 21
Producer Count
```
#### Performance Zones
| 1-4 | Linear Scale | 68K - 260K msg/sec | 100.0% | Real-time systems |
| 5-9 | Peak Zone | 319K - 479K msg/sec | 99.9% - 100% | MQTT proxy optimal |
| 10-16 | Plateau | 285K - 471K msg/sec | 97.2% - 99.9% | High-load scenarios |
| 17-20 | Decline | 205K - 259K msg/sec | 94.9% - 96.8% | Consider sharding |
#### Key Characteristics
* **Peak Performance:** 479,298 msg/sec at 9 producers
* **3x Scalability:** Throughput improvement from 1 to 20 producers
* **Excellent Reliability:** 19/20 configurations achieve >95% success rate
* **Memory Efficient:** 256KB peak capacity under maximum load
* **Zero Deadlock Risk:** Wait-free consumer operations
#### Lock-Based (General Purpose)
* **Baseline (1P/1C):** ~7.5 million items/second
* **Optimal (2P/2C):** Peaked at ~12.3 million items/second
* **High Contention (4P+):** Performance degrades due to lock contention
* **Batch Operations:** Significantly better - 1.1 ns/item for 1000-item batches
### MQTT Proxy Benchmarks
Real-world MQTT proxy simulation (4 publishers โ 1 processor):
* **Lock-Free Implementation:** 2.4M messages/sec sustained throughput
* **Dynamic Resizing:** Capacity scales from 1K โ 8K+ automatically
* **Message Loss:** <1% under extreme load (configurable backpressure)
* **Latency:** Sub-millisecond processing for 4,000 message batches
### v0.3.0 Feature Benchmarks
#### Priority Queue Performance
| push_pop_high_priority | **89 ns** | ~11.2M ops/sec |
| push_pop_low_priority | **115 ns** | ~8.7M ops/sec |
| mixed_priorities_strict | **310 ns** | ~3.2M ops/sec |
| mixed_priorities_fair | **341 ns** | ~2.9M ops/sec |
| batch_10 | 659 ns | **15.2M elem/sec** |
| batch_100 | 5.96 ยตs | **16.8M elem/sec** |
| batch_1000 | 59.6 ยตs | **16.8M elem/sec** |
| 1000_ops_mixed | 110 ยตs | **9.1M elem/sec** |
* Fair queuing adds ~10% overhead vs strict priority ordering
* Batch operations scale linearly with excellent throughput
#### Persistence Performance
| NoSync | **19.9 ยตs** | 2.1 ms | Fastest - no fsync |
| Periodic | **22.0 ยตs** | - | Background sync every 100ms |
| EveryWrite | **2.3 ms** | - | Full durability guarantee |
* NoSync mode is ~100x faster than EveryWrite
* Use Periodic sync for balanced durability/performance
#### Metrics Overhead
| Single push/pop | 120 ns | 670 ns | ~5.6x |
| Batch 10 | 153 ns | 720 ns | ~4.7x |
| Batch 100 | 270 ns | 849 ns | ~3.1x |
| Batch 1000 | 1.57 ยตs | 2.17 ยตs | **~1.4x** |
* Metrics overhead is well-amortized with batch operations
* At batch size 1000: only 40% overhead for full observability
## Quality Assurance
### Comprehensive Testing Suite
ElasticQ includes an extensive test suite that validates correctness, performance, and safety:
#### **Core Test Categories (12 implemented)**
- **ABA Protection Tests** - Validates generation-based race condition prevention
- **Message Conservation Tests** - Ensures zero message loss or duplication
- **Resize Coordination Tests** - Verifies atomic resize operations under concurrency
- **Memory Reclamation Tests** - Tests epoch-based safe memory management
- **Producer Lifecycle Tests** - Dynamic producer join/leave scenarios
- **Consumer State Management Tests** - Consumer behavior across different states
- **Edge Case Stress Tests** - Boundary conditions and extreme scenarios
- **Property-Based Tests** - 1000+ randomized test cases using `proptest`
- **Concurrency Model Tests** - Complete thread interleaving verification with `loom`
- **Performance Regression Tests** - Ensures sustained throughput guarantees
#### **Test Quality Metrics**
- **100% Critical Path Coverage** - All lock-free algorithm paths tested
- **Formal Property Validation** - Properties derived from TLA+ specification
- **Race Condition Detection** - Comprehensive concurrent execution testing
- **Memory Safety Verification** - No leaks or use-after-free under any scenario
### Production Readiness
โ
**Zero Critical Bugs** - All race conditions and data corruption issues resolved
โ
**Perfect Message Conservation** - Mathematical guarantee of no phantom messages
โ
**Memory Safety** - Comprehensive epoch-based garbage collection testing
โ
**Performance Validated** - 2.1x improvement over lock-based implementation verified
โ
**Warning-Free Compilation** - Clean codebase with zero compiler warnings
## Formal Verification
The lock-free implementation includes **TLA+ formal specifications** located in `tla+/` directory:
* **`LockFreeMPSCQueue.tla`** - Complete formal model of the lock-free algorithm
* **Safety Properties Verified:**
* FIFO ordering maintained under all concurrent operations
* Bounded capacity with no memory leaks
* Message conservation (no phantom messages or unexpected losses)
* ABA protection prevents race conditions
* Single consumer constraint enforced
* **Liveness Properties Verified:**
* Consumer progress guarantees
* Resize operation completion
* Producer fairness under contention
To run verification:
```bash
# Requires TLA+ tools installation
tlc LockFreeMPSCQueue.tla -config LockFreeMPSCQueue.cfg
```
## Use Cases & Recommendations
### ๐ **Choose Lock-Free Implementation When:**
* **MQTT Proxy/Broker:** Multiple publishers, single message processor
* **Event Streaming:** High-throughput event ingestion with single consumer
* **Real-time Systems:** Deterministic latency requirements (no blocking)
* **Single Producer:** Maximum performance for single-threaded producers
* **Zero Deadlock Tolerance:** Systems that cannot afford blocking
### ๐ **Choose Lock-Based Implementation When:**
* **General Purpose:** Balanced multi-producer multi-consumer workloads
* **Moderate Concurrency:** 2-4 threads with mixed operations
* **Async/Await Patterns:** Tokio-based applications with async methods
* **Predictable Performance:** Consistent behavior under varying load
* **Complex Operations:** Need for batch operations and flexible API
### Configuration Recommendations
#### MQTT Proxy Configuration
```rust
let config = Config::default()
.with_initial_capacity(1024) // Start with 1K messages
.with_max_capacity(1048576) // Allow up to 1M messages
.with_growth_factor(2.0) // Double capacity when full
.with_min_capacity(512); // Shrink to 512 minimum
```
#### High-Throughput Streaming
```rust
let config = Config::default()
.with_initial_capacity(8192) // Larger initial buffer
.with_max_capacity(16777216) // 16M message capacity
.with_growth_factor(1.5) // Moderate growth
.with_shrink_threshold(0.25); // Shrink when 25% utilized
```
## Design Considerations & Limitations
* **Locking Strategy:** The buffer uses a `Mutex` around the internal `VecDeque` and an `RwLock` for its logical capacity. Additionally, `push_lock: Mutex<()>` and `pop_lock: Mutex<()>` serialize all push operations against each other and all pop operations against each other. This design prioritizes correctness by ensuring that complex sequences like resize/shrink decisions and actions are atomic with respect to other operations of the same kind.
* **Scalability Trade-off:** The coarse-grained `push_lock` and `pop_lock` are the primary reason for limited scalability beyond a few concurrent threads for *single-item* operations.
* **Async Utility Methods:** Methods like `len()`, `is_empty()`, and `capacity()` are synchronous. When the `async` feature is enabled (and thus `tokio::sync` locks are used internally), these methods use `blocking_lock()` (or equivalent). This means they can block an async runtime if called from one and the lock is heavily contended. For critical async paths, use with awareness.
* **`iter()` Performance:** `iter()` clones all items in the buffer. This can be costly for large buffers or items that are expensive to clone. `drain()` is more efficient if items are to be consumed and removed.
## Contributing
Contributions are welcome! Please feel free to submit issues or pull requests. For major changes, please open an issue first to discuss your proposed changes.
### Priority Areas for Contribution
* **Performance Optimizations:** Further improvements to lock-free algorithms
* **Additional Algorithms:** SPSC (Single-Producer Single-Consumer), MPMC implementations
* **Platform Testing:** Verification on different architectures (ARM, x86, etc.)
* **Documentation:** Examples, tutorials, and API documentation
* **Formal Verification:** Extended TLA+ models and proofs
* **Feature Enhancements:** Improvements to priority queues, persistence, streams, and metrics
### Development Commands
```bash
# Run all tests
cargo test
# Run with lock-free feature
cargo test --features lock_free
# Run tests for new v0.3.0 features
cargo test --features priority
cargo test --features streams
cargo test --features persistent
cargo test --features metrics
# Run all feature tests
cargo test --all-features
# Run benchmarks
cargo bench
# Run lock-free vs lock-based benchmarks
cargo bench --features lock_free
# Run TLA+ verification (requires TLA+ tools)
cd tla+ && tlc LockFreeMPSCQueue.tla -config LockFreeMPSCQueue.cfg
# Run examples
cargo run --example lock_free_demo --features lock_free
cargo run --example performance_summary --features lock_free
```
## License
This project is licensed under the MIT License. Please see the `LICENSE` file in the repository for the full license text.
[](https://deepwiki.com/jfabienke/elasticq)