qrusty 0.19.19

A trusty priority queue server built with Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# Qrusty 🦀

> The trusty priority queue server that never forgets

Qrusty is a high-performance priority queue server written in Rust, designed for
reliable message processing with filesystem persistence and acknowledgment support.

## Features

- 🚀 **Fast** - 50,000+ messages/second
- 💾 **Persistent** - Survives restarts with RocksDB storage (or run in-memory)
- 🎯 **Priority-based** - Numeric (u64) or text (lexicographic string) priorities
-**Configurable Ordering** - Min-first or max-first priority queues
-**Acknowledgments** - Messages requeue on timeout
- 🔄 **Multiple queues** - Run dozens of isolated queues
- 🌐 **Simple HTTP API** - Easy integration with any language
- 🐳 **Docker-ready** - Single container deployment

## Priority Ordering

Qrusty supports two priority ordering modes that can be configured per queue:

### Max-First (Default)

- Higher priority values are processed first
- Priority 100 comes before priority 50
- Traditional "high priority = urgent" model
- Perfect for urgent/critical task processing

### Min-First

- Lower priority values are processed first
- Priority 10 comes before priority 50
- Unix process priority model (lower = higher priority)
- Ideal for batch processing and background jobs

### Priority Kinds

Each queue is configured with a `priority_kind` that determines which type of priority values it accepts:

- **Numeric** (default) — unsigned 64-bit integers (0 to 2^64-1)
- **Text** — arbitrary UTF-8 strings for lexicographic ordering

```bash
# Create a queue with text priorities
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"categories","config":{"ordering":"MinFirst","priority_kind":"Text"}}'

# Publish with a string priority
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"categories","priority":"alpha","payload":"First category"}'
```

## Quick Start

### Create Queues with Different Orderings

```bash
# Create a max-first queue for urgent tasks
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"ordering":"MaxFirst"}}'

# Create a min-first queue for batch processing
curl -X POST http://localhost:6784/create-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"batch","config":{"ordering":"MinFirst"}}'
```

### Publish Messages

```bash
# Publish to urgent queue (high priority = 100)
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"urgent","priority":100,"payload":"Critical task"}'

# Publish to batch queue (low priority = 10)
curl -X POST http://localhost:6784/publish \
  -H "Content-Type: application/json" \
  -d '{"queue":"batch","priority":10,"payload":"Background job"}'
```

### Consume Messages

```bash
# Consume from urgent queue (gets highest priority first)
curl -X POST http://localhost:6784/consume/urgent \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-1","timeout_seconds":30}'

# Consume from batch queue (gets lowest priority first)
curl -X POST http://localhost:6784/consume/batch \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-2","timeout_seconds":30}'
```

### Lock Timeout

By default, consumed messages are locked for 30 seconds. For long-running
processing, you can specify a longer lock timeout at consume time (HTTP) or
subscribe time (WebSocket):

```bash
# HTTP: lock for 5 minutes
curl -X POST http://localhost:6784/consume/slow_queue \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-1","timeout_seconds":300}'
```

```json
// WebSocket: subscribe with 5-minute lock
{"type": "subscribe", "queue": "slow_queue", "lock_timeout_secs": 300}

// WebSocket: renew with custom timeout
{"type": "renew", "queue": "slow_queue", "id": "msg-uuid", "lock_timeout_secs": 300}
```

### Queue Management

```bash
# Get queue statistics
curl http://localhost:6784/stats

# Update queue configuration (change allow_duplicates)
curl -X POST http://localhost:6784/update-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"allow_duplicates":false}}'

# Rename a queue and update allow_duplicates
curl -X POST http://localhost:6784/update-queue \
  -H "Content-Type: application/json" \
  -d '{"name":"urgent","config":{"name":"high_priority","allow_duplicates":false}}'

# Purge all messages from a queue (preserves configuration)
curl -X POST http://localhost:6784/purge-queue/urgent

# Delete a queue and all its messages
curl -X DELETE http://localhost:6784/delete-queue/urgent
```

## API Endpoints

- `POST /create-queue` - Create a new queue with specific configuration
- `POST /update-queue` - Update queue name and/or allow_duplicates setting (queue type is immutable)
- `POST /publish` - Publish a message to a queue
- `POST /consume/{queue}` - Consume a message from a queue
- `POST /ack/{queue}/{id}` - Acknowledge a message
- `POST /nack/{queue}/{id}` - Negative acknowledge (requeue)
- `GET /stats` - Get queue statistics
- `POST /purge-queue/{queue}` - Remove all messages (keep configuration)
- `DELETE /delete-queue/{queue}` - Delete queue and all messages
- `GET /health` - Health / readiness check — always `200 OK`; body is `{"status":"ok"}` when fully ready, or `{"status":"starting","message":"..."}` while the persistent storage cache is being loaded in the background on startup

### Startup memory safety

Storage initialisation is designed to avoid OOM during startup on large
databases. Hot tiers are populated lazily on first pop (not pre-loaded),
and the integrity scan does not re-inline externalized payloads (which
would inflate the RocksDB write buffer). RocksDB memtables are flushed
between init phases to cap write-buffer memory.

### Startup mutation gate

While the server is initialising its persistent storage cache, **all mutation
endpoints** (create/update/delete queue, publish, consume, ack, nack, purge,
batch operations) return `503 Service Unavailable` with a `Retry-After: 2`
header and a JSON body:

```json
{
  "status": "starting",
  "message": "Qrusty is initialising. Retry shortly.",
  "retry_after_secs": 2
}
```

Read-only endpoints (`/health`, `/stats`, `/queues`, `/queue-stats/{queue}`,
`/operation-timings`, `/ws`) and static files remain accessible during
initialisation. This prevents bogus empty payloads caused by clients publishing
before the storage cache is fully reconstructed.

### Broken payload recovery

When the payload store is enabled and segment files are missing or corrupted
(e.g. after toggling `QRUSTY_PAYLOAD_STORE=disabled` and back), messages whose
payloads cannot be recovered are automatically deleted:

- **At startup**: the integrity scan detects and removes broken messages
- **At consume time**: if a broken message is encountered, it is deleted and
  the next valid message is returned instead

Consumers will never receive a message with an empty payload caused by a
missing segment file.

### Payload externalization threshold

Payloads smaller than `QRUSTY_EXTERNALIZE_MIN_BYTES` (default: 4096) stay
inline in RocksDB rather than being written to the payload store. This avoids
segment file overhead for small payloads where the `PayloadRef` metadata is
comparable in size to the payload itself.

### Memory pressure reclamation

When memory usage exceeds configurable thresholds, Qrusty progressively
reclaims memory:

- **Warning** (80%): flushes RocksDB memtables, shrinks block cache to 25%,
  reclaims per-queue structures for empty queues (dedup sets, hot tier entries,
  locked index entries), calls `shrink_to_fit()` on all dedup HashSets and
  the locked index to release over-allocated capacity, and runs a full-range
  RocksDB compaction to reclaim tombstone space from acked/deleted messages.
- **Critical** (90%): all Warning-level actions, then shrinks block cache
  further to 1 MB, shrinks hot tiers to 10%, compacts the payload store
  (including cleanup of all segments when no live payloads remain), and
  rejects new publishes until pressure subsides.

## Examples and Integration

For comprehensive examples and integration guides, see:

- [Queue Management Guide]integrations/examples/queue_management_guide.md - Complete cURL, Python, and JavaScript examples
- [Python Integration Examples]integrations/python/ - Production-ready Python integration
- [Node-RED Integration]integrations/node-red/ - For IoT and workflow automation
- [Priority Usage Examples]integrations/examples/priority_examples.md - Understanding priority ordering
- [API Usage Examples]integrations/examples/api_usage_examples.md - Comprehensive API documentation

Example consume with a longer timeout:

```bash
curl -X POST http://localhost:6784/consume/batch \
  -H "Content-Type: application/json" \
  -d '{"consumer_id":"worker-2","timeout_seconds":60}'
```

## Make Targets

Run these from the dev container, or an equivalent environment:

- `make all` - Build, generate docs, run machete, and test
- `make build` - Build the Rust project with cargo
- `make doc` - Generate documentation with cargo doc
- `make machete` - Run cargo machete (dependency analysis)
- `make test` - Run tests with cargo test
- `make smoke-test` - Run the live end-to-end smoke test (see below)
- `make smoke-test-memory` - Same smoke test using in-memory storage
- `make traceability` - Print traceability report to stdout
- `make traceability-report` - Save traceability report to `docs/traceability_report.md`

### Live Smoke Test

`make smoke-test` builds the binary, starts a real qrusty server on an
isolated port, runs concurrent publishers and consumers against three queues
for 30 seconds, polls `/stats` every 3 seconds to display a live throughput
table, and asserts that `publish_rate_per_sec` and `ack_rate_per_sec` are
non-zero — proving the server actually processes messages and reports accurate
statistics.

```bash
# Default run (30 s workload, port 17784)
make smoke-test

# Longer run on a custom port
make smoke-test SMOKE_ARGS="--duration 120 --port 19000"

# Point at a release build
python scripts/smoke_test.py --binary target/release/qrusty --duration 60
```

Requirements: SYS-0011, SYS-0012

Coverage targets (requires `cargo llvm-cov` in your environment):

- `make coverage` - Run the full Rust test suite with coverage and print a summary
- `make coverage-missing` - Generate `target/llvm-cov/missing.txt` with uncovered line numbers
- `make coverage-summary-json` - Generate `target/llvm-cov/summary.json` (machine-readable totals)
- `make coverage-html` - Generate an HTML report at `target/llvm-cov/html/index.html`
- `make coverage-clean` - Remove coverage artifacts generated by `cargo llvm-cov`

Run this outside the dev container, unless it's been altered to allow docker-in-docker:

- `make show-docs` - Build and open the docs in the default browser
- `make build-qrusty` - Build and push Docker image for ARM64 platform

## Requirements (Doorstop)

This repo uses [Doorstop](https://doorstop.info/) for Requirements-Based Verification (RBV).

### Document hierarchy

| Document                    | Prefix | Parent   | Purpose                                                |
| --------------------------- | ------ | -------- | ------------------------------------------------------ |
| `requirements/system/`      | SYS    | _(root)_ | System-level requirements (deployment, health, web UI) |
| `requirements/persistence/` | PER    | SYS      | Persistence and recovery                               |
| `requirements/scheduling/`  | SCH    | SYS      | Priority and scheduling                                |
| `requirements/delivery/`    | DLV    | SYS      | Delivery semantics (lock, ack, nack, retry)            |
| `requirements/api/`         | API    | SYS      | HTTP API endpoints                                     |
| `requirements/websocket/`   | WS     | SYS      | WebSocket API                                          |

Each requirement YAML item includes:

- **text** – "The system shall …" requirement statement.
- **verification** – IADT method: `inspection`, `analysis`, `demonstration`, or `test`.
- **links** – parent requirement UIDs (for traceability).

### Run Doorstop in the dev container

The devcontainer includes a helper script that bootstraps a local venv at `.venv/` (if missing), installs Doorstop, and starts `doorstop-server`:

```bash
bash .devcontainer/start-doorstop.sh
```

Then open:

- <http://localhost:17867/>
- <http://localhost:17867/documents/SYS.html>

To stop the server:

```bash
pkill -f 'doorstop-server --host 0.0.0.0 --port 17867' || true
```

### Traceability

Requirements are linked to implementation code and tests via special comment tags:

**Implementation tagging** (Rust doc comments):

```rust
/// Requirements: PER-0001, PER-0002, SCH-0001
pub async fn push(&self, message: Message) -> Result<String, StorageError> {
```

**Test tagging** (Rust comments):

```rust
// Verifies: DLV-0001, DLV-0002, API-0003
#[tokio::test]
async fn test_push_and_pop_message() {
```

**Generate traceability report:**

```bash
# Print to stdout
make traceability

# Save to docs/traceability_report.md
make traceability-report
```

The report shows:

- Which requirements have implementation and test coverage
- Untraced requirements (gaps in coverage)
- Orphan tags (tags referencing non-existent requirements)
- Detailed links to source locations

See [docs/traceability_report.md](docs/traceability_report.md) for the current report.

## In-Memory Storage Mode

Qrusty supports an optional in-memory storage backend (`STORAGE_MODE=memory`)
that eliminates all disk I/O. This is useful when persistent storage is
unavailable or unreliable (e.g. a failing drive).

All functional behaviour (priority ordering, locking, ACK/NACK, dead-letter
queues, duplicate detection, timeout monitoring) is identical to RocksDB mode.

**All data is lost on restart.** A warning is logged at startup.

```bash
# Docker
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty

# Cargo
STORAGE_MODE=memory cargo run
```

To switch back, remove `STORAGE_MODE=memory` (or set it to `rocksdb`) and
ensure a volume is mounted at `/data`.

Requirements: SYS-0013, PER-0011

## Using the Docker Image

```bash
# Pull the image
docker pull greeng340or/qrusty

# Run with default settings (RocksDB persistence)
docker run -p 6784:6784 greeng340or/qrusty

# Run with custom data path and bind address
docker run -p 6784:6784 \
  -e DATA_PATH=/data \
  -e BIND_ADDR=0.0.0.0:6784 \
  -v qrusty-data:/data \
  greeng340or/qrusty

# Run with custom RocksDB block cache (default 64 MB)
docker run -p 6784:6784 -e ROCKSDB_CACHE_MB=128 greeng340or/qrusty

# Run with custom hot tier size (default 1000 messages per queue)
docker run -p 6784:6784 -e QRUSTY_HOT_TIER_SIZE=2000 -e QRUSTY_REFILL_THRESHOLD=500 greeng340or/qrusty

# Run with in-memory storage (no disk I/O)
docker run -p 6784:6784 -e STORAGE_MODE=memory greeng340or/qrusty

# Build locally (from a prompt that can run docker)
make build-qrusty
```

### Environment Variables

All settings have sensible defaults. See [`example.env`](example.env) for a template.

| Variable                                | Default             | Description                                               |
| --------------------------------------- | ------------------- | --------------------------------------------------------- |
| `DATA_PATH`                             | `/data`             | Directory for RocksDB and payload store data              |
| `BIND_ADDR`                             | `0.0.0.0:6784`      | Server bind address                                       |
| `STORAGE_MODE`                          | `rocksdb`           | Storage backend: `rocksdb` or `memory`                    |
| `MAX_CONNECTIONS`                       | `10000`             | Max concurrent HTTP/WS connections                        |
| `WEBUI_DIR`                             | `/opt/qrusty/webui` | Web UI static files directory                             |
| `ROCKSDB_CACHE_MB`                      | `128`               | Block cache size in MB (includes index/filter)            |
| `ROCKSDB_WRITE_BUFFER_MB`               | `16`                | RocksDB write buffer per memtable in MB                   |
| `ROCKSDB_MAX_OPEN_FILES`                | `128`               | Max open SST file handles                                 |
| `QRUSTY_HOT_TIER_SIZE`                  | `1000`              | Max available messages per queue in hot tier              |
| `QRUSTY_REFILL_THRESHOLD`               | `250`               | Refill hot tier when it drops to this count               |
| `QRUSTY_SEGMENT_MAX_MB`                 | `256`               | Payload store segment rotation size in MB                 |
| `QRUSTY_COMPACT_INTERVAL_SECS`          | `300`               | Payload compaction interval (seconds)                     |
| `QRUSTY_MEMORY_LIMIT_MB`                | auto                | Memory limit override (auto-detected from cgroup)         |
| `QRUSTY_MEMORY_PRESSURE_THRESHOLD`      | `0.80`              | Usage ratio that enters pressure (Warning level)          |
| `QRUSTY_MEMORY_PRESSURE_EXIT_THRESHOLD` | `0.70`              | Usage ratio that exits pressure (hysteresis)              |
| `QRUSTY_MEMORY_CRITICAL_THRESHOLD`      | `0.90`              | Usage ratio that triggers Critical level (shed publishes) |
| `QRUSTY_MAX_LOCKED_INDEX`               | `500000`            | Max locked message index entries                          |
| `WS_PING_INTERVAL_SECS`                 | `30`                | WebSocket ping interval                                   |
| `WS_PING_TIMEOUT_SECS`                  | `10`                | WebSocket ping timeout                                    |