disk_backed_queue
A robust, crash-resistant queue implementation for Rust that persists all data to disk using SQLite. Provides an mpsc-like channel API while ensuring messages survive application restarts and system failures.
Features
- Zero Message Loss: All messages are persisted to SQLite before acknowledgment
- Crash Recovery: Messages survive application restarts and system crashes
- MPSC-like API: Familiar channel-based interface compatible with async Rust
- Transaction-Based: Atomic operations with SQLite transactions for reliability
- Dead Letter Queue (DLQ): Corrupted messages automatically moved to separate database
- Batch Operations: High-performance bulk send/receive (460x faster than single operations)
- Backpressure Support: Optional queue size limits prevent unbounded growth
- Concurrent Access: Multiple senders supported via WAL mode and proper locking
- Binary Serialization: Efficient bincode encoding for minimal storage overhead
Use Cases
Perfect for scenarios where message loss is unacceptable:
- Message queuing during database outages
- Event sourcing and audit logs
- Job queues that survive restarts
- Data pipelines requiring guaranteed delivery
- Reliable inter-process communication
Installation
[]
= "*"
= { = "*", = ["rt-multi-thread", "macros"] } # For async runtime
= { = "*", = ["derive"] } # For message serialization
Quick Start
use disk_backed_channel;
use ;
async
Batch Operations
For high-throughput scenarios, use batch operations for massive performance gains:
// Send batch (460x faster than individual sends)
let messages: =
.map
.collect;
tx.send_batch.await?;
// Receive batch
let received = rx.recv_batch.await?; // Get up to 100 messages
println!;
Performance: Batch operations are dramatically faster than single operations. Run cargo run --example throughput_demo --release to benchmark on your hardware.
Architecture
Storage
- Main Queue: SQLite database with BLOB storage for serialized messages
- Dead Letter Queue: Separate
.dlq.dbfile for corrupted/undeserializable messages - WAL Mode: Write-Ahead Logging for better concurrency and crash safety
- Atomic Transactions: All operations use SQLite transactions for ACID guarantees
Queue Structure
my_queue.db # Main queue database
├── messages table
│ ├── id (PRIMARY KEY)
│ ├── data (BLOB) # Bincode-serialized message
│ └── created_at (INT) # Unix timestamp
my_queue.dlq.db # Dead letter queue
└── messages_dlq table
├── id (PRIMARY KEY)
├── original_id (INT)
├── data (BLOB)
├── error_message (TEXT)
├── created_at (INT)
└── moved_at (INT)
Dead Letter Queue (DLQ)
When messages fail to deserialize (schema changes, corruption, etc.), they're automatically moved to the DLQ instead of blocking the queue.
Inspecting the DLQ
# Open the DLQ database
# View corrupted messages
;
# Count corrupted messages
) ;
# View error patterns
)
;
Common DLQ Scenarios
- Schema Changes: Your Rust struct changed but old messages still in queue
- Corrupted Data: Disk corruption or incomplete writes
- Version Mismatches: Different versions of your app using same queue
Recovery Strategies
# Export corrupted messages for analysis
# Clear old DLQ entries after investigation
# Clear entire DLQ
API Reference
Channel Creation
pub async
Sender Methods
// Send single message
async async
Receiver Methods
// Receive single message (returns None if empty)
async async async
Direct Queue Access
For advanced use cases, you can use DiskBackedQueue directly:
use DiskBackedQueue;
let queue = new.await?;
// All sender/receiver methods available
queue.send.await?;
queue.clear.await?; // Clear all messages
Configuration
Queue Size Limits
// Unlimited queue (default)
let = disk_backed_channel.await?;
// Limited queue (blocks senders when full)
let = disk_backed_channel.await?;
When the queue reaches max size, send() will block with exponential backoff until space becomes available.
Durability Levels
Control the trade-off between performance and data safety:
use ;
// Maximum safety (default) - survives power loss
let = disk_backed_channel_with_durability.await?;
// Balanced performance and safety
let = disk_backed_channel_with_durability.await?;
// Maximum performance - no power loss protection
let = disk_backed_channel_with_durability.await?;
| Durability Level | Performance Impact | Power Loss Safety | SQLite Mode | Use Case |
|---|---|---|---|---|
Full (default) |
Baseline | ✅ Maximum | SYNCHRONOUS=FULL |
Critical data, financial transactions |
Normal |
(untested) | ⚠️ Good | SYNCHRONOUS=NORMAL |
Balanced performance, high-throughput caching |
Off |
(untested) | ❌ None | SYNCHRONOUS=OFF |
Cache/ephemeral data only |
Extra |
(untested) | ✅✅ Paranoid | SYNCHRONOUS=EXTRA |
Regulatory compliance |
Note: Batch operations have similar throughput across all durability levels since fsync cost is amortized. Run the throughput demo to measure actual performance differences on your system.
Table Name Validation
Table names must:
- Be non-empty and ≤ 128 characters
- Contain only alphanumeric characters and underscores
Error Handling
use DiskQueueError;
match rx.recv.await
Error Types
Database(rusqlite::Error)- SQLite database errorsSerialization(bincode::error::EncodeError)- Failed to serialize messageDeserialization(bincode::error::DecodeError)- Corrupted message (moved to DLQ)InvalidTableName(String)- Invalid table name providedTaskJoin(String)- Internal async task errorQueueClosed- Queue was closedUnexpectedRowCount(String)- Database consistency errorQueueFull(usize)- Queue reached size limit (only with max_size)
Performance
Benchmarks
Performance varies significantly based on your hardware (SSD vs HDD), operating system, and filesystem.
Key Insight: Single operations are limited by fsync overhead. Batch operations use a single transaction, achieving 50-500x speedup depending on your system.
To measure on your hardware:
This will show you actual throughput for single and batch operations with different durability levels on your specific system.
Optimization Tips
- Use Batch Operations: For high throughput, accumulate messages and use
send_batch() - Tune Batch Size: Test different batch sizes (100-1000) for your workload
- WAL Mode: Enabled by default for better concurrency
- Message Size: Smaller messages = better throughput
- Concurrent Senders: Multiple senders work well due to WAL mode
Testing
# Run tests
# Run benchmarks
# Run throughput demo
Thread Safety
- Sender:
Clone+Send+Sync- Safe to clone and share across threads - Receiver:
Sendbut notSync- Use from a single async task - Internal: All SQLite operations use
spawn_blockingto avoid blocking async executor
Requirements
- Rust: Edition 2024 (1.86+)
- Tokio: Async runtime required
- SQLite: Bundled via rusqlite (no external dependencies)
License
MIT
Contributing
Contributions welcome! Areas for improvement:
- Async-friendly DLQ recovery API
- Metrics/observability hooks
- Pluggable serialization backends
- Queue compaction/vacuuming
- Multi-receiver support