1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! Redis Streams consumer queue with concurrency control, graceful shutdown,
//! and dead-letter handling.
//!
//! # Architecture
//!
//! ```text
//! Redis Stream ──XREADGROUP──► main consumer loop ──► worker(task)
//! │ │
//! │ semaphore ▼
//! │ backpressure XACK on success
//! │
//! PEL (stuck) ──XAUTOCLAIM──► claimer loop ──► worker(task) or DLQ
//! │
//! delivery count
//! via XPENDING
//! ```
//!
//! - **[`Queue`]** — the main consumer. Built via [`QueueBuilder`], started
//! with [`Queue::run()`], which returns a [`QueueHandle`] for graceful
//! shutdown.
//! - **[`Claimer`]** — optional reclaimer for stuck messages. Attached to a
//! `Queue` via `QueueBuilder::claimer`. Uses `XAUTOCLAIM` + `XPENDING` to
//! track delivery counts, and routes exhausted messages to an optional
//! dead-letter callback.
//! - **[`Task`]** — a wrapper pairing a stream message ID with a deserialized
//! payload.
//!
//! # Example
//!
//! ```rust,ignore
//! use flowd::task::*;
//! use std::sync::Arc;
//!
//! let queue = Queue::new(QueueBuilder {
//! name: "emails".into(),
//! consumer_group: "senders".into(),
//! consumer_id: "sender-1".into(),
//! block_timeout: 5000,
//! max_concurrent_tasks: 20,
//! worker: Arc::new(|email: &Email| async move {
//! send_email(email).await
//! }),
//! claimer: Some(Claimer {
//! min_idle_time: 30_000,
//! block_timeout: 10_000,
//! max_concurrent_tasks: 5,
//! max_retries: 3,
//! dlq_worker: Some(Arc::new(|email: &Email, attempts: usize| async move {
//! log::error!("dead-lettered after {attempts} attempts: {:?}", email);
//! Ok::<(), String>(())
//! })),
//! _marker: Default::default(),
//! }),
//! conn,
//! _marker: Default::default(),
//! });
//!
//! queue.init(None).await?;
//! let handle = queue.run();
//!
//! // On SIGTERM:
//! handle.shutdown().await;
//! ```
pub use ;
pub use ;