# Qrusty 🦀
> The trusty priority queue server that never forgets
Qrusty is a high-performance priority queue server written in Rust, designed for
reliable message processing with filesystem persistence and acknowledgment support.
## Features
- 🚀 **Fast** - 50,000+ messages/second
- 💾 **Persistent** - Survives restarts with RocksDB storage (or run in-memory)
- 🎯 **Priority-based** - Numeric (u64) or text (lexicographic string) priorities
- ⚡ **Configurable Ordering** - Min-first or max-first priority queues
- ✅ **Acknowledgments** - Messages requeue on timeout
- 🔄 **Multiple queues** - Run dozens of isolated queues
- 🌐 **Simple HTTP API** - Easy integration with any language
- 🐳 **Docker-ready** - Single container deployment
## Priority Ordering
Qrusty supports two priority ordering modes that can be configured per queue:
### Max-First (Default)
- Higher priority values are processed first
- Priority 100 comes before priority 50
- Traditional "high priority = urgent" model
- Perfect for urgent/critical task processing
### Min-First
- Lower priority values are processed first
- Priority 10 comes before priority 50
- Unix process priority model (lower = higher priority)
- Ideal for batch processing and background jobs
### Priority Kinds
Each queue is configured with a `priority_kind` that determines which type of priority values it accepts:
- **Numeric** (default) — unsigned 64-bit integers (0 to 2^64-1)
- **Text** — arbitrary UTF-8 strings for lexicographic ordering
```bash
# Create a queue with text priorities
curl -X POST http://localhost:6784/create-queue \
-H "Content-Type: application/json" \
-d '{"name":"categories","config":{"ordering":"MinFirst","priority_kind":"Text"}}'
# Publish with a string priority
curl -X POST http://localhost:6784/publish \
-H "Content-Type: application/json" \
-d '{"queue":"categories","priority":"alpha","payload":"First category"}'
```
## Quick Start
### Create Queues with Different Orderings
```bash
# Create a max-first queue for urgent tasks
curl -X POST http://localhost:6784/create-queue \
-H "Content-Type: application/json" \
-d '{"name":"urgent","config":{"ordering":"MaxFirst"}}'
# Create a min-first queue for batch processing
curl -X POST http://localhost:6784/create-queue \
-H "Content-Type: application/json" \
-d '{"name":"batch","config":{"ordering":"MinFirst"}}'
```
### Publish Messages
```bash
# Publish to urgent queue (high priority = 100)
curl -X POST http://localhost:6784/publish \
-H "Content-Type: application/json" \
-d '{"queue":"urgent","priority":100,"payload":"Critical task"}'
# Publish to batch queue (low priority = 10)
curl -X POST http://localhost:6784/publish \
-H "Content-Type: application/json" \
-d '{"queue":"batch","priority":10,"payload":"Background job"}'
```
### Consume Messages
```bash
# Consume from urgent queue (gets highest priority first)
curl -X POST http://localhost:6784/consume/urgent \
-H "Content-Type: application/json" \
-d '{"consumer_id":"worker-1","timeout_seconds":30}'
# Consume from batch queue (gets lowest priority first)
curl -X POST http://localhost:6784/consume/batch \
-H "Content-Type: application/json" \
-d '{"consumer_id":"worker-2","timeout_seconds":30}'
```
### Queue Management
```bash
# Get queue statistics
curl http://localhost:6784/stats
# Update queue configuration (change allow_duplicates)
curl -X POST http://localhost:6784/update-queue \
-H "Content-Type: application/json" \
-d '{"name":"urgent","config":{"allow_duplicates":false}}'
# Rename a queue and update allow_duplicates
curl -X POST http://localhost:6784/update-queue \
-H "Content-Type: application/json" \
-d '{"name":"urgent","config":{"name":"high_priority","allow_duplicates":false}}'
# Purge all messages from a queue (preserves configuration)
curl -X POST http://localhost:6784/purge-queue/urgent
# Delete a queue and all its messages
curl -X DELETE http://localhost:6784/delete-queue/urgent
```
## API Endpoints
- `POST /create-queue` - Create a new queue with specific configuration
- `POST /update-queue` - Update queue name and/or allow_duplicates setting (queue type is immutable)
- `POST /publish` - Publish a message to a queue
- `POST /consume/{queue}` - Consume a message from a queue
- `POST /ack/{queue}/{id}` - Acknowledge a message
- `POST /nack/{queue}/{id}` - Negative acknowledge (requeue)
- `GET /stats` - Get queue statistics
- `POST /purge-queue/{queue}` - Remove all messages (keep configuration)
- `DELETE /delete-queue/{queue}` - Delete queue and all messages
- `GET /health` - Health / readiness check — always `200 OK`; body is `{"status":"ok"}` when fully ready, or `{"status":"starting","message":"..."}` while the persistent storage cache is being loaded in the background on startup
## Examples and Integration
For comprehensive examples and integration guides, see:
- [Queue Management Guide](integrations/examples/queue_management_guide.md) - Complete cURL, Python, and JavaScript examples
- [Python Integration Examples](integrations/python/) - Production-ready Python integration
- [Node-RED Integration](integrations/node-red/) - For IoT and workflow automation
- [Priority Usage Examples](integrations/examples/priority_examples.md) - Understanding priority ordering
- [API Usage Examples](integrations/examples/api_usage_examples.md) - Comprehensive API documentation
Example consume with a longer timeout:
```bash
curl -X POST http://localhost:6784/consume/batch \
-H "Content-Type: application/json" \
-d '{"consumer_id":"worker-2","timeout_seconds":60}'
```
## Make Targets
Run these from the dev container, or an equivalent environment:
- `make all` - Build, generate docs, run machete, and test
- `make build` - Build the Rust project with cargo
- `make doc` - Generate documentation with cargo doc
- `make machete` - Run cargo machete (dependency analysis)
- `make test` - Run tests with cargo test
- `make smoke-test` - Run the live end-to-end smoke test (see below)
- `make smoke-test-memory` - Same smoke test using in-memory storage
- `make traceability` - Print traceability report to stdout
- `make traceability-report` - Save traceability report to `docs/traceability_report.md`
### Live Smoke Test
`make smoke-test` builds the binary, starts a real qrusty server on an
isolated port, runs concurrent publishers and consumers against three queues
for 30 seconds, polls `/stats` every 3 seconds to display a live throughput
table, and asserts that `publish_rate_per_sec` and `ack_rate_per_sec` are
non-zero — proving the server actually processes messages and reports accurate
statistics.
```bash
# Default run (30 s workload, port 17784)
make smoke-test
# Longer run on a custom port
make smoke-test SMOKE_ARGS="--duration 120 --port 19000"
# Point at a release build
python scripts/smoke_test.py --binary target/release/qrusty --duration 60
```
Requirements: SYS-0011, SYS-0012
Coverage targets (requires `cargo llvm-cov` in your environment):
- `make coverage` - Run the full Rust test suite with coverage and print a summary
- `make coverage-missing` - Generate `target/llvm-cov/missing.txt` with uncovered line numbers
- `make coverage-summary-json` - Generate `target/llvm-cov/summary.json` (machine-readable totals)
- `make coverage-html` - Generate an HTML report at `target/llvm-cov/html/index.html`
- `make coverage-clean` - Remove coverage artifacts generated by `cargo llvm-cov`
Run this outside the dev container, unless it's been altered to allow docker-in-docker:
- `make show-docs` - Build and open the docs in the default browser
- `make build-qrusty` - Build and push Docker image for ARM64 platform
## Requirements (Doorstop)
This repo uses [Doorstop](https://doorstop.info/) for Requirements-Based Verification (RBV).
### Document hierarchy
| `requirements/system/` | SYS | _(root)_ | System-level requirements (deployment, health, web UI) |
| `requirements/persistence/` | PER | SYS | Persistence and recovery |
| `requirements/scheduling/` | SCH | SYS | Priority and scheduling |
| `requirements/delivery/` | DLV | SYS | Delivery semantics (lock, ack, nack, retry) |
| `requirements/api/` | API | SYS | HTTP API endpoints |
| `requirements/websocket/` | WS | SYS | WebSocket API |
Each requirement YAML item includes:
- **text** – "The system shall …" requirement statement.
- **verification** – IADT method: `inspection`, `analysis`, `demonstration`, or `test`.
- **links** – parent requirement UIDs (for traceability).
### Run Doorstop in the dev container
The devcontainer includes a helper script that bootstraps a local venv at `.venv/` (if missing), installs Doorstop, and starts `doorstop-server`:
```bash
bash .devcontainer/start-doorstop.sh
```
Then open:
- <http://localhost:17867/>
- <http://localhost:17867/documents/SYS.html>
To stop the server:
```bash
### Traceability
Requirements are linked to implementation code and tests via special comment tags:
**Implementation tagging** (Rust doc comments):
```rust
/// Requirements: PER-0001, PER-0002, SCH-0001
pub async fn push(&self, message: Message) -> Result<String, StorageError> {
```
**Test tagging** (Rust comments):
```rust
// Verifies: DLV-0001, DLV-0002, API-0003
#[tokio::test]
async fn test_push_and_pop_message() {
```
**Generate traceability report:**
```bash
# Print to stdout
make traceability
# Save to docs/traceability_report.md
make traceability-report
```
The report shows:
- Which requirements have implementation and test coverage
- Untraced requirements (gaps in coverage)
- Orphan tags (tags referencing non-existent requirements)
- Detailed links to source locations
See [docs/traceability_report.md](docs/traceability_report.md) for the current report.
## In-Memory Storage Mode
Qrusty supports an optional in-memory storage backend (`STORAGE_MODE=memory`)
that eliminates all disk I/O. This is useful when persistent storage is
unavailable or unreliable (e.g. a failing drive).
All functional behaviour (priority ordering, locking, ACK/NACK, dead-letter
queues, duplicate detection, timeout monitoring) is identical to RocksDB mode.
**All data is lost on restart.** A warning is logged at startup.
```bash
# Docker
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty
# Cargo
STORAGE_MODE=memory cargo run
```
To switch back, remove `STORAGE_MODE=memory` (or set it to `rocksdb`) and
ensure a volume is mounted at `/data`.
Requirements: SYS-0013, PER-0011
## Using the Docker Image
```bash
# Pull the image
docker pull greeng340or/qrusty
# Run with default settings (RocksDB persistence)
docker run -p 6784:6784 greeng340or/qrusty
# Run with custom data path and bind address
docker run -p 6784:6784 \
-e DATA_PATH=/data \
-e BIND_ADDR=0.0.0.0:6784 \
-v qrusty-data:/data \
greeng340or/qrusty
# Run with custom RocksDB block cache (default 64 MB)
docker run -p 6784:6784 -e ROCKSDB_CACHE_MB=128 greeng340or/qrusty
# Run with custom hot tier size (default 1000 messages per queue)
docker run -p 6784:6784 -e QRUSTY_HOT_TIER_SIZE=2000 -e QRUSTY_REFILL_THRESHOLD=500 greeng340or/qrusty
# Run with in-memory storage (no disk I/O)
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty
# Build locally (from a prompt that can run docker)
make build-qrusty
```
### Environment Variables
All settings have sensible defaults. See [`example.env`](example.env) for a template.
| `DATA_PATH` | `/data` | Directory for RocksDB and payload store data |
| `BIND_ADDR` | `0.0.0.0:6784` | Server bind address |
| `STORAGE_MODE` | `rocksdb` | Storage backend: `rocksdb` or `memory` |
| `MAX_CONNECTIONS` | `10000` | Max concurrent HTTP/WS connections |
| `WEBUI_DIR` | `/opt/qrusty/webui` | Web UI static files directory |
| `ROCKSDB_CACHE_MB` | `256` | Block cache size in MB (includes index/filter) |
| `ROCKSDB_WRITE_BUFFER_MB` | `16` | RocksDB write buffer per memtable in MB |
| `ROCKSDB_MAX_OPEN_FILES` | `256` | Max open SST file handles |
| `QRUSTY_HOT_TIER_SIZE` | `1000` | Max available messages per queue in hot tier |
| `QRUSTY_REFILL_THRESHOLD` | `250` | Refill hot tier when it drops to this count |
| `QRUSTY_SEGMENT_MAX_MB` | `256` | Payload store segment rotation size in MB |
| `QRUSTY_COMPACT_INTERVAL_SECS` | `300` | Payload compaction interval (seconds) |
| `QRUSTY_MEMORY_LIMIT_MB` | auto | Memory limit override (auto-detected from cgroup) |
| `QRUSTY_MEMORY_PRESSURE_THRESHOLD` | `0.80` | Usage ratio that triggers pressure response |
| `QRUSTY_MAX_LOCKED_INDEX` | `500000` | Max locked message index entries |
| `WS_PING_INTERVAL_SECS` | `30` | WebSocket ping interval |
| `WS_PING_TIMEOUT_SECS` | `10` | WebSocket ping timeout |