qrusty 0.19.15

A trusty priority queue server built with Rust
Documentation

Qrusty 🦀

The trusty priority queue server that never forgets

Qrusty is a high-performance priority queue server written in Rust, designed for reliable message processing with filesystem persistence and acknowledgment support.

Features

  • 🚀 Fast - 50,000+ messages/second
  • 💾 Persistent - Survives restarts with RocksDB storage (or run in-memory)
  • 🎯 Priority-based - Numeric (u64) or text (lexicographic string) priorities
  • Configurable Ordering - Min-first or max-first priority queues
  • Acknowledgments - Messages requeue on timeout
  • 🔄 Multiple queues - Run dozens of isolated queues
  • 🌐 Simple HTTP API - Easy integration with any language
  • 🐳 Docker-ready - Single container deployment

Priority Ordering

Qrusty supports two priority ordering modes that can be configured per queue:

Max-First (Default)

  • Higher priority values are processed first
  • Priority 100 comes before priority 50
  • Traditional "high priority = urgent" model
  • Perfect for urgent/critical task processing

Min-First

  • Lower priority values are processed first
  • Priority 10 comes before priority 50
  • Unix process priority model (lower = higher priority)
  • Ideal for batch processing and background jobs

Priority Kinds

Each queue is configured with a priority_kind that determines which type of priority values it accepts:

  • Numeric (default) — unsigned 64-bit integers (0 to 2^64-1)
  • Text — arbitrary UTF-8 strings for lexicographic ordering
# Create a queue with text priorities
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"categories","config":{"ordering":"MinFirst","priority_kind":"Text"}}'

# Publish with a string priority
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"categories","priority":"alpha","payload":"First category"}'

Quick Start

Create Queues with Different Orderings

# Create a max-first queue for urgent tasks
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"ordering":"MaxFirst"}}'

# Create a min-first queue for batch processing
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"batch","config":{"ordering":"MinFirst"}}'

Publish Messages

# Publish to urgent queue (high priority = 100)
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"urgent","priority":100,"payload":"Critical task"}'

# Publish to batch queue (low priority = 10)
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"batch","priority":10,"payload":"Background job"}'

Consume Messages

# Consume from urgent queue (gets highest priority first)
curl -X POST http://localhost:6784/consume/urgent \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-1","timeout_seconds":30}'

# Consume from batch queue (gets lowest priority first)
curl -X POST http://localhost:6784/consume/batch \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-2","timeout_seconds":30}'

Lock Timeout

By default, consumed messages are locked for 30 seconds. For long-running processing, you can specify a longer lock timeout at consume time (HTTP) or subscribe time (WebSocket):

# HTTP: lock for 5 minutes
curl -X POST http://localhost:6784/consume/slow_queue \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-1","timeout_seconds":300}'
// WebSocket: subscribe with 5-minute lock
{"type": "subscribe", "queue": "slow_queue", "lock_timeout_secs": 300}

// WebSocket: renew with custom timeout
{"type": "renew", "queue": "slow_queue", "id": "msg-uuid", "lock_timeout_secs": 300}

Queue Management

# Get queue statistics
curl http://localhost:6784/stats

# Update queue configuration (change allow_duplicates)
curl -X POST http://localhost:6784/update-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"allow_duplicates":false}}'

# Rename a queue and update allow_duplicates
curl -X POST http://localhost:6784/update-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"name":"high_priority","allow_duplicates":false}}'

# Purge all messages from a queue (preserves configuration)
curl -X POST http://localhost:6784/purge-queue/urgent

# Delete a queue and all its messages
curl -X DELETE http://localhost:6784/delete-queue/urgent

API Endpoints

  • POST /create-queue - Create a new queue with specific configuration
  • POST /update-queue - Update queue name and/or allow_duplicates setting (queue type is immutable)
  • POST /publish - Publish a message to a queue
  • POST /consume/{queue} - Consume a message from a queue
  • POST /ack/{queue}/{id} - Acknowledge a message
  • POST /nack/{queue}/{id} - Negative acknowledge (requeue)
  • GET /stats - Get queue statistics
  • POST /purge-queue/{queue} - Remove all messages (keep configuration)
  • DELETE /delete-queue/{queue} - Delete queue and all messages
  • GET /health - Health / readiness check — always 200 OK; body is {"status":"ok"} when fully ready, or {"status":"starting","message":"..."} while the persistent storage cache is being loaded in the background on startup

Startup memory safety

Storage initialisation is designed to avoid OOM during startup on large databases. Hot tiers are populated lazily on first pop (not pre-loaded), and the integrity scan does not re-inline externalized payloads (which would inflate the RocksDB write buffer). RocksDB memtables are flushed between init phases to cap write-buffer memory.

Startup mutation gate

While the server is initialising its persistent storage cache, all mutation endpoints (create/update/delete queue, publish, consume, ack, nack, purge, batch operations) return 503 Service Unavailable with a Retry-After: 2 header and a JSON body:

{
  "status": "starting",
  "message": "Qrusty is initialising. Retry shortly.",
  "retry_after_secs": 2
}

Read-only endpoints (/health, /stats, /queues, /queue-stats/{queue}, /operation-timings, /ws) and static files remain accessible during initialisation. This prevents bogus empty payloads caused by clients publishing before the storage cache is fully reconstructed.

Broken payload recovery

When the payload store is enabled and segment files are missing or corrupted (e.g. after toggling QRUSTY_PAYLOAD_STORE=disabled and back), messages whose payloads cannot be recovered are automatically deleted:

  • At startup: the integrity scan detects and removes broken messages
  • At consume time: if a broken message is encountered, it is deleted and the next valid message is returned instead

Consumers will never receive a message with an empty payload caused by a missing segment file.

Payload externalization threshold

Payloads smaller than QRUSTY_EXTERNALIZE_MIN_BYTES (default: 4096) stay inline in RocksDB rather than being written to the payload store. This avoids segment file overhead for small payloads where the PayloadRef metadata is comparable in size to the payload itself.

Memory pressure reclamation

When memory usage exceeds configurable thresholds, Qrusty progressively reclaims memory:

  • Warning (80%): flushes RocksDB memtables, shrinks block cache to 25%, reclaims per-queue structures for empty queues (dedup sets, hot tier entries, locked index entries), calls shrink_to_fit() on all dedup HashSets and the locked index to release over-allocated capacity, and runs a full-range RocksDB compaction to reclaim tombstone space from acked/deleted messages.
  • Critical (90%): all Warning-level actions, then shrinks block cache further to 1 MB, shrinks hot tiers to 10%, compacts the payload store (including cleanup of all segments when no live payloads remain), and rejects new publishes until pressure subsides.

Examples and Integration

For comprehensive examples and integration guides, see:

Example consume with a longer timeout:

curl -X POST http://localhost:6784/consume/batch \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-2","timeout_seconds":60}'

Make Targets

Run these from the dev container, or an equivalent environment:

  • make all - Build, generate docs, run machete, and test
  • make build - Build the Rust project with cargo
  • make doc - Generate documentation with cargo doc
  • make machete - Run cargo machete (dependency analysis)
  • make test - Run tests with cargo test
  • make smoke-test - Run the live end-to-end smoke test (see below)
  • make smoke-test-memory - Same smoke test using in-memory storage
  • make traceability - Print traceability report to stdout
  • make traceability-report - Save traceability report to docs/traceability_report.md

Live Smoke Test

make smoke-test builds the binary, starts a real qrusty server on an isolated port, runs concurrent publishers and consumers against three queues for 30 seconds, polls /stats every 3 seconds to display a live throughput table, and asserts that publish_rate_per_sec and ack_rate_per_sec are non-zero — proving the server actually processes messages and reports accurate statistics.

# Default run (30 s workload, port 17784)
make smoke-test

# Longer run on a custom port
make smoke-test SMOKE_ARGS="--duration 120 --port 19000"

# Point at a release build
python scripts/smoke_test.py --binary target/release/qrusty --duration 60

Requirements: SYS-0011, SYS-0012

Coverage targets (requires cargo llvm-cov in your environment):

  • make coverage - Run the full Rust test suite with coverage and print a summary
  • make coverage-missing - Generate target/llvm-cov/missing.txt with uncovered line numbers
  • make coverage-summary-json - Generate target/llvm-cov/summary.json (machine-readable totals)
  • make coverage-html - Generate an HTML report at target/llvm-cov/html/index.html
  • make coverage-clean - Remove coverage artifacts generated by cargo llvm-cov

Run this outside the dev container, unless it's been altered to allow docker-in-docker:

  • make show-docs - Build and open the docs in the default browser
  • make build-qrusty - Build and push Docker image for ARM64 platform

Requirements (Doorstop)

This repo uses Doorstop for Requirements-Based Verification (RBV).

Document hierarchy

Document Prefix Parent Purpose
requirements/system/ SYS (root) System-level requirements (deployment, health, web UI)
requirements/persistence/ PER SYS Persistence and recovery
requirements/scheduling/ SCH SYS Priority and scheduling
requirements/delivery/ DLV SYS Delivery semantics (lock, ack, nack, retry)
requirements/api/ API SYS HTTP API endpoints
requirements/websocket/ WS SYS WebSocket API

Each requirement YAML item includes:

  • text – "The system shall …" requirement statement.
  • verification – IADT method: inspection, analysis, demonstration, or test.
  • links – parent requirement UIDs (for traceability).

Run Doorstop in the dev container

The devcontainer includes a helper script that bootstraps a local venv at .venv/ (if missing), installs Doorstop, and starts doorstop-server:

bash .devcontainer/start-doorstop.sh

Then open:

To stop the server:

pkill -f 'doorstop-server --host 0.0.0.0 --port 17867' || true

Traceability

Requirements are linked to implementation code and tests via special comment tags:

Implementation tagging (Rust doc comments):

/// Requirements: PER-0001, PER-0002, SCH-0001
pub async fn push(&self, message: Message) -> Result<String, StorageError> {

Test tagging (Rust comments):

// Verifies: DLV-0001, DLV-0002, API-0003
#[tokio::test]
async fn test_push_and_pop_message() {

Generate traceability report:

# Print to stdout
make traceability

# Save to docs/traceability_report.md
make traceability-report

The report shows:

  • Which requirements have implementation and test coverage
  • Untraced requirements (gaps in coverage)
  • Orphan tags (tags referencing non-existent requirements)
  • Detailed links to source locations

See docs/traceability_report.md for the current report.

In-Memory Storage Mode

Qrusty supports an optional in-memory storage backend (STORAGE_MODE=memory) that eliminates all disk I/O. This is useful when persistent storage is unavailable or unreliable (e.g. a failing drive).

All functional behaviour (priority ordering, locking, ACK/NACK, dead-letter queues, duplicate detection, timeout monitoring) is identical to RocksDB mode.

All data is lost on restart. A warning is logged at startup.

# Docker
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty

# Cargo
STORAGE_MODE=memory cargo run

To switch back, remove STORAGE_MODE=memory (or set it to rocksdb) and ensure a volume is mounted at /data.

Requirements: SYS-0013, PER-0011

Using the Docker Image

# Pull the image
docker pull greeng340or/qrusty

# Run with default settings (RocksDB persistence)
docker run -p 6784:6784 greeng340or/qrusty

# Run with custom data path and bind address
docker run -p 6784:6784 \
  -e DATA_PATH=/data \
  -e BIND_ADDR=0.0.0.0:6784 \
  -v qrusty-data:/data \
  greeng340or/qrusty

# Run with custom RocksDB block cache (default 64 MB)
docker run -p 6784:6784 -e ROCKSDB_CACHE_MB=128 greeng340or/qrusty

# Run with custom hot tier size (default 1000 messages per queue)
docker run -p 6784:6784 -e QRUSTY_HOT_TIER_SIZE=2000 -e QRUSTY_REFILL_THRESHOLD=500 greeng340or/qrusty

# Run with in-memory storage (no disk I/O)
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty

# Build locally (from a prompt that can run docker)
make build-qrusty

Environment Variables

All settings have sensible defaults. See example.env for a template.

Variable Default Description
DATA_PATH /data Directory for RocksDB and payload store data
BIND_ADDR 0.0.0.0:6784 Server bind address
STORAGE_MODE rocksdb Storage backend: rocksdb or memory
MAX_CONNECTIONS 10000 Max concurrent HTTP/WS connections
WEBUI_DIR /opt/qrusty/webui Web UI static files directory
ROCKSDB_CACHE_MB 128 Block cache size in MB (includes index/filter)
ROCKSDB_WRITE_BUFFER_MB 16 RocksDB write buffer per memtable in MB
ROCKSDB_MAX_OPEN_FILES 128 Max open SST file handles
QRUSTY_HOT_TIER_SIZE 1000 Max available messages per queue in hot tier
QRUSTY_REFILL_THRESHOLD 250 Refill hot tier when it drops to this count
QRUSTY_SEGMENT_MAX_MB 256 Payload store segment rotation size in MB
QRUSTY_COMPACT_INTERVAL_SECS 300 Payload compaction interval (seconds)
QRUSTY_MEMORY_LIMIT_MB auto Memory limit override (auto-detected from cgroup)
QRUSTY_MEMORY_PRESSURE_THRESHOLD 0.80 Usage ratio that enters pressure (Warning level)
QRUSTY_MEMORY_PRESSURE_EXIT_THRESHOLD 0.70 Usage ratio that exits pressure (hysteresis)
QRUSTY_MEMORY_CRITICAL_THRESHOLD 0.90 Usage ratio that triggers Critical level (shed publishes)
QRUSTY_MAX_LOCKED_INDEX 500000 Max locked message index entries
WS_PING_INTERVAL_SECS 30 WebSocket ping interval
WS_PING_TIMEOUT_SECS 10 WebSocket ping timeout