qrusty 0.19.3

A trusty priority queue server built with Rust
Documentation
# Qrusty 🦀

> The trusty priority queue server that never forgets

Qrusty is a high-performance priority queue server written in Rust, designed for
reliable message processing with filesystem persistence and acknowledgment support.

## Features

- 🚀 **Fast** - 50,000+ messages/second
- 💾 **Persistent** - Survives restarts with RocksDB storage (or run in-memory)
- 🎯 **Priority-based** - Numeric (u64) or text (lexicographic string) priorities
-**Configurable Ordering** - Min-first or max-first priority queues
-**Acknowledgments** - Messages requeue on timeout
- 🔄 **Multiple queues** - Run dozens of isolated queues
- 🌐 **Simple HTTP API** - Easy integration with any language
- 🐳 **Docker-ready** - Single container deployment

## Priority Ordering

Qrusty supports two priority ordering modes that can be configured per queue:

### Max-First (Default)

- Higher priority values are processed first
- Priority 100 comes before priority 50
- Traditional "high priority = urgent" model
- Perfect for urgent/critical task processing

### Min-First

- Lower priority values are processed first
- Priority 10 comes before priority 50
- Unix process priority model (lower = higher priority)
- Ideal for batch processing and background jobs

### Priority Kinds

Each queue is configured with a `priority_kind` that determines which type of priority values it accepts:

- **Numeric** (default) — unsigned 64-bit integers (0 to 2^64-1)
- **Text** — arbitrary UTF-8 strings for lexicographic ordering

```bash
# Create a queue with text priorities
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"categories","config":{"ordering":"MinFirst","priority_kind":"Text"}}'

# Publish with a string priority
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"categories","priority":"alpha","payload":"First category"}'
```

## Quick Start

### Create Queues with Different Orderings

```bash
# Create a max-first queue for urgent tasks
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"ordering":"MaxFirst"}}'

# Create a min-first queue for batch processing
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"batch","config":{"ordering":"MinFirst"}}'
```

### Publish Messages

```bash
# Publish to urgent queue (high priority = 100)
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"urgent","priority":100,"payload":"Critical task"}'

# Publish to batch queue (low priority = 10)
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"batch","priority":10,"payload":"Background job"}'
```

### Consume Messages

```bash
# Consume from urgent queue (gets highest priority first)
curl -X POST http://localhost:6784/consume/urgent \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-1","timeout_seconds":30}'

# Consume from batch queue (gets lowest priority first)
curl -X POST http://localhost:6784/consume/batch \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-2","timeout_seconds":30}'
```

### Queue Management

```bash
# Get queue statistics
curl http://localhost:6784/stats

# Update queue configuration (change allow_duplicates)
curl -X POST http://localhost:6784/update-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"allow_duplicates":false}}'

# Rename a queue and update allow_duplicates
curl -X POST http://localhost:6784/update-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"name":"high_priority","allow_duplicates":false}}'

# Purge all messages from a queue (preserves configuration)
curl -X POST http://localhost:6784/purge-queue/urgent

# Delete a queue and all its messages
curl -X DELETE http://localhost:6784/delete-queue/urgent
```

## API Endpoints

- `POST /create-queue` - Create a new queue with specific configuration
- `POST /update-queue` - Update queue name and/or allow_duplicates setting (queue type is immutable)
- `POST /publish` - Publish a message to a queue
- `POST /consume/{queue}` - Consume a message from a queue
- `POST /ack/{queue}/{id}` - Acknowledge a message
- `POST /nack/{queue}/{id}` - Negative acknowledge (requeue)
- `GET /stats` - Get queue statistics
- `POST /purge-queue/{queue}` - Remove all messages (keep configuration)
- `DELETE /delete-queue/{queue}` - Delete queue and all messages
- `GET /health` - Health / readiness check — always `200 OK`; body is `{"status":"ok"}` when fully ready, or `{"status":"starting","message":"..."}` while the persistent storage cache is being loaded in the background on startup

## Examples and Integration

For comprehensive examples and integration guides, see:

- [Queue Management Guide]integrations/examples/queue_management_guide.md - Complete cURL, Python, and JavaScript examples
- [Python Integration Examples]integrations/python/ - Production-ready Python integration
- [Node-RED Integration]integrations/node-red/ - For IoT and workflow automation
- [Priority Usage Examples]integrations/examples/priority_examples.md - Understanding priority ordering
- [API Usage Examples]integrations/examples/api_usage_examples.md - Comprehensive API documentation

Example consume with a longer timeout:

```bash
curl -X POST http://localhost:6784/consume/batch \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-2","timeout_seconds":60}'
```

## Make Targets

Run these from the dev container, or an equivalent environment:

- `make all` - Build, generate docs, run machete, and test
- `make build` - Build the Rust project with cargo
- `make doc` - Generate documentation with cargo doc
- `make machete` - Run cargo machete (dependency analysis)
- `make test` - Run tests with cargo test
- `make smoke-test` - Run the live end-to-end smoke test (see below)
- `make smoke-test-memory` - Same smoke test using in-memory storage
- `make traceability` - Print traceability report to stdout
- `make traceability-report` - Save traceability report to `docs/traceability_report.md`

### Live Smoke Test

`make smoke-test` builds the binary, starts a real qrusty server on an
isolated port, runs concurrent publishers and consumers against three queues
for 30 seconds, polls `/stats` every 3 seconds to display a live throughput
table, and asserts that `publish_rate_per_sec` and `ack_rate_per_sec` are
non-zero — proving the server actually processes messages and reports accurate
statistics.

```bash
# Default run (30 s workload, port 17784)
make smoke-test

# Longer run on a custom port
make smoke-test SMOKE_ARGS="--duration 120 --port 19000"

# Point at a release build
python scripts/smoke_test.py --binary target/release/qrusty --duration 60
```

Requirements: SYS-0011, SYS-0012

Coverage targets (requires `cargo llvm-cov` in your environment):

- `make coverage` - Run the full Rust test suite with coverage and print a summary
- `make coverage-missing` - Generate `target/llvm-cov/missing.txt` with uncovered line numbers
- `make coverage-summary-json` - Generate `target/llvm-cov/summary.json` (machine-readable totals)
- `make coverage-html` - Generate an HTML report at `target/llvm-cov/html/index.html`
- `make coverage-clean` - Remove coverage artifacts generated by `cargo llvm-cov`

Run this outside the dev container, unless it's been altered to allow docker-in-docker:

- `make show-docs` - Build and open the docs in the default browser
- `make build-qrusty` - Build and push Docker image for ARM64 platform

## Requirements (Doorstop)

This repo uses [Doorstop](https://doorstop.info/) for Requirements-Based Verification (RBV).

### Document hierarchy

| Document                    | Prefix | Parent   | Purpose                                                |
| --------------------------- | ------ | -------- | ------------------------------------------------------ |
| `requirements/system/`      | SYS    | _(root)_ | System-level requirements (deployment, health, web UI) |
| `requirements/persistence/` | PER    | SYS      | Persistence and recovery                               |
| `requirements/scheduling/`  | SCH    | SYS      | Priority and scheduling                                |
| `requirements/delivery/`    | DLV    | SYS      | Delivery semantics (lock, ack, nack, retry)            |
| `requirements/api/`         | API    | SYS      | HTTP API endpoints                                     |
| `requirements/websocket/`   | WS     | SYS      | WebSocket API                                          |

Each requirement YAML item includes:

- **text** – "The system shall …" requirement statement.
- **verification** – IADT method: `inspection`, `analysis`, `demonstration`, or `test`.
- **links** – parent requirement UIDs (for traceability).

### Run Doorstop in the dev container

The devcontainer includes a helper script that bootstraps a local venv at `.venv/` (if missing), installs Doorstop, and starts `doorstop-server`:

```bash
bash .devcontainer/start-doorstop.sh
```

Then open:

- <http://localhost:17867/>
- <http://localhost:17867/documents/SYS.html>

To stop the server:

```bash
pkill -f 'doorstop-server --host 0.0.0.0 --port 17867' || true
```

### Traceability

Requirements are linked to implementation code and tests via special comment tags:

**Implementation tagging** (Rust doc comments):

```rust
/// Requirements: PER-0001, PER-0002, SCH-0001
pub async fn push(&self, message: Message) -> Result<String, StorageError> {
```

**Test tagging** (Rust comments):

```rust
// Verifies: DLV-0001, DLV-0002, API-0003
#[tokio::test]
async fn test_push_and_pop_message() {
```

**Generate traceability report:**

```bash
# Print to stdout
make traceability

# Save to docs/traceability_report.md
make traceability-report
```

The report shows:

- Which requirements have implementation and test coverage
- Untraced requirements (gaps in coverage)
- Orphan tags (tags referencing non-existent requirements)
- Detailed links to source locations

See [docs/traceability_report.md](docs/traceability_report.md) for the current report.

## In-Memory Storage Mode

Qrusty supports an optional in-memory storage backend (`STORAGE_MODE=memory`)
that eliminates all disk I/O. This is useful when persistent storage is
unavailable or unreliable (e.g. a failing drive).

All functional behaviour (priority ordering, locking, ACK/NACK, dead-letter
queues, duplicate detection, timeout monitoring) is identical to RocksDB mode.

**All data is lost on restart.** A warning is logged at startup.

```bash
# Docker
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty

# Cargo
STORAGE_MODE=memory cargo run
```

To switch back, remove `STORAGE_MODE=memory` (or set it to `rocksdb`) and
ensure a volume is mounted at `/data`.

Requirements: SYS-0013, PER-0011

## Using the Docker Image

```bash
# Pull the image
docker pull greeng340or/qrusty

# Run with default settings (RocksDB persistence)
docker run -p 6784:6784 greeng340or/qrusty

# Run with custom data path and bind address
docker run -p 6784:6784 \
  -e DATA_PATH=/data \
  -e BIND_ADDR=0.0.0.0:6784 \
  -v qrusty-data:/data \
  greeng340or/qrusty

# Run with custom RocksDB block cache (default 64 MB)
docker run -p 6784:6784 -e ROCKSDB_CACHE_MB=128 greeng340or/qrusty

# Run with custom hot tier size (default 1000 messages per queue)
docker run -p 6784:6784 -e QRUSTY_HOT_TIER_SIZE=2000 -e QRUSTY_REFILL_THRESHOLD=500 greeng340or/qrusty

# Run with in-memory storage (no disk I/O)
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty

# Build locally (from a prompt that can run docker)
make build-qrusty
```

### Environment Variables

All settings have sensible defaults. See [`example.env`](example.env) for a template.

| Variable                           | Default             | Description                                       |
| ---------------------------------- | ------------------- | ------------------------------------------------- |
| `DATA_PATH`                        | `/data`             | Directory for RocksDB and payload store data      |
| `BIND_ADDR`                        | `0.0.0.0:6784`      | Server bind address                               |
| `STORAGE_MODE`                     | `rocksdb`           | Storage backend: `rocksdb` or `memory`            |
| `MAX_CONNECTIONS`                  | `10000`             | Max concurrent HTTP/WS connections                |
| `WEBUI_DIR`                        | `/opt/qrusty/webui` | Web UI static files directory                     |
| `ROCKSDB_CACHE_MB`                 | `256`               | Block cache size in MB (includes index/filter)    |
| `ROCKSDB_WRITE_BUFFER_MB`          | `16`                | RocksDB write buffer per memtable in MB           |
| `ROCKSDB_MAX_OPEN_FILES`           | `256`               | Max open SST file handles                         |
| `QRUSTY_HOT_TIER_SIZE`             | `1000`              | Max available messages per queue in hot tier      |
| `QRUSTY_REFILL_THRESHOLD`          | `250`               | Refill hot tier when it drops to this count       |
| `QRUSTY_SEGMENT_MAX_MB`            | `256`               | Payload store segment rotation size in MB         |
| `QRUSTY_COMPACT_INTERVAL_SECS`     | `300`               | Payload compaction interval (seconds)             |
| `QRUSTY_MEMORY_LIMIT_MB`           | auto                | Memory limit override (auto-detected from cgroup) |
| `QRUSTY_MEMORY_PRESSURE_THRESHOLD` | `0.80`              | Usage ratio that triggers pressure response       |
| `QRUSTY_MAX_LOCKED_INDEX`          | `500000`            | Max locked message index entries                  |
| `WS_PING_INTERVAL_SECS`            | `30`                | WebSocket ping interval                           |
| `WS_PING_TIMEOUT_SECS`             | `10`                | WebSocket ping timeout                            |