Expand description
Redis Streams consumer queue with concurrency control, graceful shutdown, and dead-letter handling.
§Architecture
Redis Stream ──XREADGROUP──► main consumer loop ──► worker(task)
│ │
│ semaphore ▼
│ backpressure XACK on success
│
PEL (stuck) ──XAUTOCLAIM──► claimer loop ──► worker(task) or DLQ
│
delivery count
via XPENDINGQueue— the main consumer. Built viaQueueBuilder, started withQueue::run(), which returns aQueueHandlefor graceful shutdown.Claimer— optional reclaimer for stuck messages. Attached to aQueueviaQueueBuilder::claimer. UsesXAUTOCLAIM+XPENDINGto track delivery counts, and routes exhausted messages to an optional dead-letter callback.Task— a wrapper pairing a stream message ID with a deserialized payload.
§Example
ⓘ
use flowd::task::*;
use std::sync::Arc;
let queue = Queue::new(QueueBuilder {
name: "emails".into(),
consumer_group: "senders".into(),
consumer_id: "sender-1".into(),
block_timeout: 5000,
max_concurrent_tasks: 20,
worker: Arc::new(|email: &Email| async move {
send_email(email).await
}),
claimer: Some(Claimer {
min_idle_time: 30_000,
block_timeout: 10_000,
max_concurrent_tasks: 5,
max_retries: 3,
dlq_worker: Some(Arc::new(|email: &Email, attempts: usize| async move {
log::error!("dead-lettered after {attempts} attempts: {:?}", email);
Ok::<(), String>(())
})),
_marker: Default::default(),
}),
conn,
_marker: Default::default(),
});
queue.init(None).await?;
let handle = queue.run();
// On SIGTERM:
handle.shutdown().await;Structs§
- Claimer
- Configuration for the claimer worker that reclaims stuck messages from
the pending entry list (PEL) via
XAUTOCLAIM. - Claimer
Builder - Builder for constructing a
Claimer. - Queue
- A Redis Streams consumer queue.
- Queue
Builder - Builder struct for constructing a
Queue. - Queue
Handle - Handle returned by
Queue::run(). - Task
- A single unit of work to be enqueued into a Redis Stream.
Type Aliases§
- NoDlq
Error - Placeholder DLQ error type used when no claimer is configured.
- NoDlqFn
- Placeholder DLQ function pointer used when no claimer is configured.
- NoDlq
Fut - Placeholder DLQ future type used when no claimer is configured.