Qrusty 🦀
The trusty priority queue server that never forgets
Qrusty is a high-performance priority queue server written in Rust, designed for reliable message processing with filesystem persistence and acknowledgment support.
Features
- 🚀 Fast - 50,000+ messages/second
- 💾 Persistent - Survives restarts with RocksDB storage (or run in-memory)
- 🎯 Priority-based - Numeric (u64) or text (lexicographic string) priorities
- ⚡ Configurable Ordering - Min-first or max-first priority queues
- ✅ Acknowledgments - Messages requeue on timeout
- 🔄 Multiple queues - Run dozens of isolated queues
- 🌐 Simple HTTP API - Easy integration with any language
- 🐳 Docker-ready - Single container deployment
Priority Ordering
Qrusty supports two priority ordering modes that can be configured per queue:
Max-First (Default)
- Higher priority values are processed first
- Priority 100 comes before priority 50
- Traditional "high priority = urgent" model
- Perfect for urgent/critical task processing
Min-First
- Lower priority values are processed first
- Priority 10 comes before priority 50
- Unix process priority model (lower = higher priority)
- Ideal for batch processing and background jobs
Priority Kinds
Each queue is configured with a priority_kind that determines which type of priority values it accepts:
- Numeric (default) — unsigned 64-bit integers (0 to 2^64-1)
- Text — arbitrary UTF-8 strings for lexicographic ordering
# Create a queue with text priorities
# Publish with a string priority
Quick Start
Create Queues with Different Orderings
# Create a max-first queue for urgent tasks
# Create a min-first queue for batch processing
Publish Messages
# Publish to urgent queue (high priority = 100)
# Publish to batch queue (low priority = 10)
Consume Messages
# Consume from urgent queue (gets highest priority first)
# Consume from batch queue (gets lowest priority first)
Lock Timeout
By default, consumed messages are locked for 30 seconds. For long-running processing, you can specify a longer lock timeout at consume time (HTTP) or subscribe time (WebSocket):
# HTTP: lock for 5 minutes
// WebSocket: subscribe with 5-minute lock
// WebSocket: renew with custom timeout
Queue Management
# Get queue statistics
# Update queue configuration (change allow_duplicates)
# Rename a queue and update allow_duplicates
# Purge all messages from a queue (preserves configuration)
# Delete a queue and all its messages
API Endpoints
POST /create-queue- Create a new queue with specific configurationPOST /update-queue- Update queue name and/or allow_duplicates setting (queue type is immutable)POST /publish- Publish a message to a queuePOST /consume/{queue}- Consume a message from a queuePOST /ack/{queue}/{id}- Acknowledge a messagePOST /nack/{queue}/{id}- Negative acknowledge (requeue)GET /stats- Get queue statisticsPOST /purge-queue/{queue}- Remove all messages (keep configuration)DELETE /delete-queue/{queue}- Delete queue and all messagesGET /health- Health / readiness check — always200 OK; body is{"status":"ok"}when fully ready, or{"status":"starting","message":"..."}while the persistent storage cache is being loaded in the background on startup
Startup memory safety
Storage initialisation is designed to avoid OOM during startup on large databases. Hot tiers are populated lazily on first pop (not pre-loaded), and the integrity scan does not re-inline externalized payloads (which would inflate the RocksDB write buffer). RocksDB memtables are flushed between init phases to cap write-buffer memory.
Startup mutation gate
While the server is initialising its persistent storage cache, all mutation
endpoints (create/update/delete queue, publish, consume, ack, nack, purge,
batch operations) return 503 Service Unavailable with a Retry-After: 2
header and a JSON body:
Read-only endpoints (/health, /stats, /queues, /queue-stats/{queue},
/operation-timings, /ws) and static files remain accessible during
initialisation. This prevents bogus empty payloads caused by clients publishing
before the storage cache is fully reconstructed.
Broken payload recovery
When the payload store is enabled and segment files are missing or corrupted
(e.g. after toggling QRUSTY_PAYLOAD_STORE=disabled and back), messages whose
payloads cannot be recovered are automatically deleted:
- At startup: the integrity scan detects and removes broken messages
- At consume time: if a broken message is encountered, it is deleted and the next valid message is returned instead
Consumers will never receive a message with an empty payload caused by a missing segment file.
Payload externalization threshold
Payloads smaller than QRUSTY_EXTERNALIZE_MIN_BYTES (default: 4096) stay
inline in RocksDB rather than being written to the payload store. This avoids
segment file overhead for small payloads where the PayloadRef metadata is
comparable in size to the payload itself.
Memory pressure reclamation
When memory usage exceeds configurable thresholds, Qrusty progressively reclaims memory:
- Warning (80%): flushes RocksDB memtables, shrinks block cache to 25%,
reclaims per-queue structures for empty queues (dedup sets, hot tier entries,
locked index entries), calls
shrink_to_fit()on all dedup HashSets and the locked index to release over-allocated capacity, and runs a full-range RocksDB compaction to reclaim tombstone space from acked/deleted messages. - Critical (90%): all Warning-level actions, then shrinks block cache further to 1 MB, shrinks hot tiers to 10%, compacts the payload store (including cleanup of all segments when no live payloads remain), and rejects new publishes until pressure subsides.
Examples and Integration
For comprehensive examples and integration guides, see:
- Queue Management Guide - Complete cURL, Python, and JavaScript examples
- Python Integration Examples - Production-ready Python integration
- Node-RED Integration - For IoT and workflow automation
- Priority Usage Examples - Understanding priority ordering
- API Usage Examples - Comprehensive API documentation
Example consume with a longer timeout:
Make Targets
Run these from the dev container, or an equivalent environment:
make all- Build, generate docs, run machete, and testmake build- Build the Rust project with cargomake doc- Generate documentation with cargo docmake machete- Run cargo machete (dependency analysis)make test- Run tests with cargo testmake smoke-test- Run the live end-to-end smoke test (see below)make smoke-test-memory- Same smoke test using in-memory storagemake traceability- Print traceability report to stdoutmake traceability-report- Save traceability report todocs/traceability_report.md
Live Smoke Test
make smoke-test builds the binary, starts a real qrusty server on an
isolated port, runs concurrent publishers and consumers against three queues
for 30 seconds, polls /stats every 3 seconds to display a live throughput
table, and asserts that publish_rate_per_sec and ack_rate_per_sec are
non-zero — proving the server actually processes messages and reports accurate
statistics.
# Default run (30 s workload, port 17784)
# Longer run on a custom port
# Point at a release build
Requirements: SYS-0011, SYS-0012
Coverage targets (requires cargo llvm-cov in your environment):
make coverage- Run the full Rust test suite with coverage and print a summarymake coverage-missing- Generatetarget/llvm-cov/missing.txtwith uncovered line numbersmake coverage-summary-json- Generatetarget/llvm-cov/summary.json(machine-readable totals)make coverage-html- Generate an HTML report attarget/llvm-cov/html/index.htmlmake coverage-clean- Remove coverage artifacts generated bycargo llvm-cov
Run this outside the dev container, unless it's been altered to allow docker-in-docker:
make show-docs- Build and open the docs in the default browsermake build-qrusty- Build and push Docker image for ARM64 platform
Requirements (Doorstop)
This repo uses Doorstop for Requirements-Based Verification (RBV).
Document hierarchy
| Document | Prefix | Parent | Purpose |
|---|---|---|---|
requirements/system/ |
SYS | (root) | System-level requirements (deployment, health, web UI) |
requirements/persistence/ |
PER | SYS | Persistence and recovery |
requirements/scheduling/ |
SCH | SYS | Priority and scheduling |
requirements/delivery/ |
DLV | SYS | Delivery semantics (lock, ack, nack, retry) |
requirements/api/ |
API | SYS | HTTP API endpoints |
requirements/websocket/ |
WS | SYS | WebSocket API |
Each requirement YAML item includes:
- text – "The system shall …" requirement statement.
- verification – IADT method:
inspection,analysis,demonstration, ortest. - links – parent requirement UIDs (for traceability).
Run Doorstop in the dev container
The devcontainer includes a helper script that bootstraps a local venv at .venv/ (if missing), installs Doorstop, and starts doorstop-server:
Then open:
To stop the server:
||
Traceability
Requirements are linked to implementation code and tests via special comment tags:
Implementation tagging (Rust doc comments):
/// Requirements: PER-0001, PER-0002, SCH-0001
pub async
Test tagging (Rust comments):
// Verifies: DLV-0001, DLV-0002, API-0003
async
Generate traceability report:
# Print to stdout
# Save to docs/traceability_report.md
The report shows:
- Which requirements have implementation and test coverage
- Untraced requirements (gaps in coverage)
- Orphan tags (tags referencing non-existent requirements)
- Detailed links to source locations
See docs/traceability_report.md for the current report.
In-Memory Storage Mode
Qrusty supports an optional in-memory storage backend (STORAGE_MODE=memory)
that eliminates all disk I/O. This is useful when persistent storage is
unavailable or unreliable (e.g. a failing drive).
All functional behaviour (priority ordering, locking, ACK/NACK, dead-letter queues, duplicate detection, timeout monitoring) is identical to RocksDB mode.
All data is lost on restart. A warning is logged at startup.
# Docker
# Cargo
STORAGE_MODE=memory
To switch back, remove STORAGE_MODE=memory (or set it to rocksdb) and
ensure a volume is mounted at /data.
Requirements: SYS-0013, PER-0011
Using the Docker Image
# Pull the image
# Run with default settings (RocksDB persistence)
# Run with custom data path and bind address
# Run with custom RocksDB block cache (default 64 MB)
# Run with custom hot tier size (default 1000 messages per queue)
# Run with in-memory storage (no disk I/O)
# Build locally (from a prompt that can run docker)
Environment Variables
All settings have sensible defaults. See example.env for a template.
| Variable | Default | Description |
|---|---|---|
DATA_PATH |
/data |
Directory for RocksDB and payload store data |
BIND_ADDR |
0.0.0.0:6784 |
Server bind address |
STORAGE_MODE |
rocksdb |
Storage backend: rocksdb or memory |
MAX_CONNECTIONS |
10000 |
Max concurrent HTTP/WS connections |
WEBUI_DIR |
/opt/qrusty/webui |
Web UI static files directory |
ROCKSDB_CACHE_MB |
128 |
Block cache size in MB (includes index/filter) |
ROCKSDB_WRITE_BUFFER_MB |
16 |
RocksDB write buffer per memtable in MB |
ROCKSDB_MAX_OPEN_FILES |
128 |
Max open SST file handles |
QRUSTY_HOT_TIER_SIZE |
1000 |
Max available messages per queue in hot tier |
QRUSTY_REFILL_THRESHOLD |
250 |
Refill hot tier when it drops to this count |
QRUSTY_SEGMENT_MAX_MB |
256 |
Payload store segment rotation size in MB |
QRUSTY_COMPACT_INTERVAL_SECS |
300 |
Payload compaction interval (seconds) |
QRUSTY_MEMORY_LIMIT_MB |
auto | Memory limit override (auto-detected from cgroup) |
QRUSTY_MEMORY_PRESSURE_THRESHOLD |
0.80 |
Usage ratio that enters pressure (Warning level) |
QRUSTY_MEMORY_PRESSURE_EXIT_THRESHOLD |
0.70 |
Usage ratio that exits pressure (hysteresis) |
QRUSTY_MEMORY_CRITICAL_THRESHOLD |
0.90 |
Usage ratio that triggers Critical level (shed publishes) |
QRUSTY_MAX_LOCKED_INDEX |
500000 |
Max locked message index entries |
WS_PING_INTERVAL_SECS |
30 |
WebSocket ping interval |
WS_PING_TIMEOUT_SECS |
10 |
WebSocket ping timeout |