flowd
A Redis Streams-backed task queue for Rust, with a derive macro for automatic
struct ↔ redis::Value mapping. Runs on either tokio or smol.
- Consumer groups with
XREADGROUP, semaphore-based concurrency control, and graceful shutdown. - Optional claimer that reclaims stuck messages via
XAUTOCLAIM, tracks delivery counts viaXPENDING, and routes exhausted messages to a dead-letter callback. - Batch enqueue via Redis pipelining (
enqueue_bulk). #[derive(Job)]generates all the glue between your struct and Redis Stream fields — no manual serialization.
Installation
[]
= "0.1" # tokio (default)
= "1"
For smol:
[]
= { = "0.1", = false, = ["smol"] }
= "1"
Exactly one runtime feature must be enabled. tokio and smol are mutually
exclusive; tokio is on by default.
Quick start
use *;
use ;
async
Architecture
Redis Stream ──XREADGROUP──► main consumer loop ──► worker(task)
│ │
│ semaphore ▼
│ backpressure XACK on success
│
PEL (stuck) ──XAUTOCLAIM──► claimer loop ──► worker(task) or DLQ
│
delivery count
via XPENDING
Queue— the main consumer. Built withQueueBuilder, started withQueue::run(), which returns aQueueHandlefor graceful shutdown.Claimer— optional reclaimer. Attached viaQueueBuilder::claimer. Messages whose delivery count exceedsmax_retriesare routed to the optionaldlq_workercallback and acknowledged.Task— a stream message ID paired with a typed payload.
Enqueueing jobs
// One-shot
producer.enqueue.await?;
// Batch — single round-trip via pipelining
producer.enqueue_bulk.await?;
Dead-letter handling
use ;
let claimer = new
.min_idle_time
.max_retries
.dlq_worker;
let queue = new;
The Job derive
use *;
- Required fields need
Display+FromRedisValue(or a custom#[mapper]). Option<T>fields are skipped onNoneand becomeNoneon missing keys.
Connection handling
QueueBuilder::new takes two MultiplexedConnections:
conn— for all non-blocking ops (XACK, XADD, XAUTOCLAIM, XPENDING). Safe to clone and share across queues.read_conn— used exclusively for the blockingXREADGROUP.
Important.
connandread_connmust be independent connections obtained from separate calls toredis::Client::get_multiplexed_async_connection. Do not pass a clone of the same handle. AMultiplexedConnectionpipelines all commands over a single socket, so a blockingXREADGROUPon a shared handle stalls every other command queued behind it — including this queue's own XACKs, which will then time out.
Runtime features
| Feature | Enables |
|---|---|
tokio |
tokio::sync::Semaphore, tokio::task::JoinSet, tokio-comp |
smol |
mea::semaphore::Semaphore, FuturesUnordered, smol-comp |
The two are mutually exclusive; enabling neither or both is a compile error.
MSRV
Rust 1.85 (2024 edition).
License
Dual-licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.