1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//! Redis Streams consumer queue with concurrency control, graceful shutdown,
//! and dead-letter handling.
//!
//! # Architecture
//!
//! ```text
//! Redis Stream ──XREADGROUP──► main consumer loop ──► worker(task)
//! │ │
//! │ semaphore ▼
//! │ backpressure XACK on success
//! │
//! PEL (stuck) ──XAUTOCLAIM──► claimer loop ──► worker(task) or DLQ
//! │
//! delivery count
//! via XPENDING
//! ```
//!
//! - **[`Queue`]** — the main consumer. Built via [`QueueBuilder`], started
//! with [`Queue::run()`], which returns a [`QueueHandle`] for graceful
//! shutdown.
//! - **[`Claimer`]** — optional reclaimer for stuck messages. Attached to a
//! `Queue` via `QueueBuilder::claimer`. Uses `XAUTOCLAIM` + `XPENDING` to
//! track delivery counts, and routes exhausted messages to an optional
//! dead-letter callback.
//! - **[`Task`]** — a wrapper pairing a stream message ID with a deserialized
//! payload.
//!
//! # Example
//!
//! ```rust,ignore
//! use flowd::prelude::*;
//! use flowd::task::{ClaimerBuilder, Queue, QueueBuilder};
//!
//! let client = redis::Client::open("redis://127.0.0.1:6379")?;
//! let conn = client.get_multiplexed_async_connection().await?;
//! let read_conn = client.get_multiplexed_async_connection().await?;
//!
//! let claimer = ClaimerBuilder::<Email, _, _, _>::new()
//! .min_idle_time(30_000)
//! .max_retries(3)
//! .dlq_worker(|email: Email, attempts: usize| async move {
//! log::error!("dead-lettered after {attempts} attempts: {:?}", email);
//! Ok::<(), String>(())
//! });
//!
//! let queue = Queue::new(
//! QueueBuilder::new(
//! "emails", "senders", "sender-1",
//! |email: Email| async move { send_email(email).await },
//! conn,
//! read_conn,
//! )
//! .block_timeout(5000)
//! .max_concurrent_tasks(20)
//! .claimer(claimer),
//! );
//!
//! queue.init().await?;
//! let handle = queue.run();
//!
//! // On SIGTERM:
//! handle.shutdown().await;
//! ```
pub use ;
pub use ;
pub use ;
pub use Task;