Skip to main content

ruststream_fred/
stream.rs

1//! Builder describing one Redis Streams subscription.
2//!
3//! A subscription always reads through a consumer group. Two read modes are selected by
4//! constructor, never by a runtime flag, because they return disjoint message sets:
5//!
6//! * [`RedisStream::new`] reads fresh entries off the tail (`XREADGROUP > ...`).
7//! * [`RedisStream::reclaim`] reads stale pending entries another consumer never acked
8//!   (`XAUTOCLAIM`, idle at least `min_idle`) - the crash-recovery path.
9//!
10//! Inferring the mode from a numeric parameter would be a footgun (a stray idle timeout could
11//! silently stop fresh delivery), so the mode is part of the constructor name. Recovery is a
12//! separate `reclaim` subscriber on the same group: "two handlers per group".
13
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::Duration;
16
17use ruststream::SubscriptionSource;
18
19use crate::deadletter::PoisonPolicy;
20use crate::delay::{DelayConfig, DelayedRetry};
21use crate::{RedisBroker, error::RedisError, subscriber::RedisSubscriber};
22
23const DEFAULT_COUNT: u64 = 64;
24const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
25
26/// Generates an automatic consumer name when the caller does not set one. Distinct names keep
27/// each in-process subscriber's pending list separate within a shared group.
28fn auto_consumer() -> String {
29    static COUNTER: AtomicU64 = AtomicU64::new(0);
30    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
31    format!("ruststream-{n}")
32}
33
34/// Where a freshly created consumer group starts reading from. Only consulted when the group does
35/// not yet exist; an existing group keeps its own cursor.
36#[derive(Debug, Clone, Default)]
37pub enum StreamStart {
38    /// Only entries added after the group is created (`$`). The default.
39    #[default]
40    New,
41    /// Every entry currently in the stream (`0`).
42    Beginning,
43    /// A specific entry ID, exclusive.
44    Id(String),
45}
46
47impl StreamStart {
48    pub(crate) fn as_id(&self) -> &str {
49        match self {
50            Self::New => "$",
51            Self::Beginning => "0",
52            Self::Id(id) => id,
53        }
54    }
55}
56
57#[derive(Debug, Clone)]
58pub(crate) enum ReadMode {
59    /// `XREADGROUP >` - fresh tail.
60    Fresh,
61    /// `XAUTOCLAIM` of entries idle at least this long.
62    Reclaim { min_idle: Duration },
63}
64
65/// Describes one Redis Streams subscription against [`crate::RedisBroker`].
66///
67/// # Examples
68///
69/// ```
70/// use std::time::Duration;
71/// use ruststream_fred::RedisStream;
72///
73/// // Fresh tail: a normal worker reading new entries.
74/// let fresh = RedisStream::new("orders").group("workers").count(128);
75///
76/// // Recovery: reclaim entries a crashed worker left pending for over 30s.
77/// let recover = RedisStream::reclaim("orders", Duration::from_secs(30)).group("workers");
78/// # let _ = (fresh, recover);
79/// ```
80#[derive(Debug, Clone)]
81#[must_use]
82pub struct RedisStream {
83    key: String,
84    group: Option<String>,
85    consumer: Option<String>,
86    count: Option<u64>,
87    block: Option<Duration>,
88    start: StreamStart,
89    mode: ReadMode,
90    dead_letter: Option<String>,
91    max_deliveries: Option<u64>,
92    delayed_retry: Option<DelayedRetry>,
93}
94
95impl RedisStream {
96    /// A fresh-tail subscription on `key`: reads new entries via `XREADGROUP >`.
97    ///
98    /// A consumer group is required; set it with [`group`](Self::group).
99    pub fn new(key: impl Into<String>) -> Self {
100        Self {
101            key: key.into(),
102            group: None,
103            consumer: None,
104            count: None,
105            block: None,
106            start: StreamStart::New,
107            mode: ReadMode::Fresh,
108            dead_letter: None,
109            max_deliveries: None,
110            delayed_retry: None,
111        }
112    }
113
114    /// A recovery subscription on `key`: reclaims pending entries idle at least `min_idle` via
115    /// `XAUTOCLAIM`. Run it alongside a [`new`](Self::new) subscriber on the same group to pick up
116    /// messages a consumer fetched but died before acking.
117    ///
118    /// `min_idle` has no default and must exceed the longest legitimate handler runtime: set it too
119    /// low and a healthy consumer's in-flight message gets reclaimed and processed twice.
120    pub fn reclaim(key: impl Into<String>, min_idle: Duration) -> Self {
121        Self {
122            key: key.into(),
123            group: None,
124            consumer: None,
125            count: None,
126            block: None,
127            start: StreamStart::New,
128            mode: ReadMode::Reclaim { min_idle },
129            dead_letter: None,
130            max_deliveries: None,
131            delayed_retry: None,
132        }
133    }
134
135    /// Sets the consumer group. Required for every subscription.
136    pub fn group(mut self, group: impl Into<String>) -> Self {
137        self.group = Some(group.into());
138        self
139    }
140
141    /// Sets this consumer's name within the group. Defaults to an auto-generated unique name.
142    pub fn consumer(mut self, consumer: impl Into<String>) -> Self {
143        self.consumer = Some(consumer.into());
144        self
145    }
146
147    /// Upper bound on entries fetched per read. Defaults to 64.
148    pub const fn count(mut self, count: u64) -> Self {
149        self.count = Some(count);
150        self
151    }
152
153    /// How long one read blocks waiting for entries. Defaults to 5 seconds. In fresh-tail mode this
154    /// is the `XREADGROUP` server-side block; in reclaim mode `XAUTOCLAIM` does not block, so this is
155    /// the poll interval slept between scans that find nothing to reclaim.
156    pub const fn block(mut self, block: Duration) -> Self {
157        self.block = Some(block);
158        self
159    }
160
161    /// Where a newly created group starts reading. Ignored if the group already exists. Only
162    /// meaningful for the fresh-tail [`new`](Self::new) mode.
163    pub fn start_id(mut self, start: StreamStart) -> Self {
164        self.start = start;
165        self
166    }
167
168    /// Routes dropped and poison messages to the named dead-letter stream instead of discarding
169    /// them. Off by default. The copy is tagged with
170    /// [`DEAD_LETTER_REASON_HEADER`](crate::DEAD_LETTER_REASON_HEADER). See [`crate::deadletter`].
171    pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
172        self.dead_letter = Some(key.into());
173        self
174    }
175
176    /// Caps how many times a message may be delivered before it is treated as poison (dead-lettered
177    /// or, with no dead-letter stream, discarded). Off by default.
178    ///
179    /// The cap is checked against both the framework retry-count header (the `nack`/republish loop)
180    /// and the native stream delivery count (the reclaim loop), so a message poisoning either way is
181    /// caught.
182    pub const fn max_deliveries(mut self, max: u64) -> Self {
183        self.max_deliveries = Some(max);
184        self
185    }
186
187    /// Opts this subscription into durable, crash-safe delayed retry backed by a ZSET delay queue.
188    ///
189    /// Off by default: without it, `retry_after(delay)` / `nack_after(delay)` degrade to the
190    /// runtime's broker-agnostic deferred re-publish (at-most-once over the delay window). With it,
191    /// a delayed delivery is `ZADD`ed to the named ZSET and replayed from there once due, so the
192    /// retry survives a process crash. See [`DelayedRetry`] for the key and TTL requirements.
193    ///
194    /// The sweeper that replays due entries runs inside this subscription's read loop, so its
195    /// granularity is the read [`block`](Self::block) interval.
196    pub fn delayed_retry(mut self, retry: DelayedRetry) -> Self {
197        self.delayed_retry = Some(retry);
198        self
199    }
200
201    /// The stream key this subscription reads.
202    #[must_use]
203    pub fn key(&self) -> &str {
204        &self.key
205    }
206
207    pub(crate) fn group_or_err(&self) -> Result<&str, RedisError> {
208        self.group.as_deref().ok_or_else(|| {
209            RedisError::InvalidOptions(format!(
210                "stream subscription on `{}` requires a consumer group: call .group(name)",
211                self.key
212            ))
213        })
214    }
215
216    pub(crate) fn consumer_or_auto(&self) -> String {
217        self.consumer.clone().unwrap_or_else(auto_consumer)
218    }
219
220    pub(crate) fn count_or_default(&self) -> u64 {
221        self.count.unwrap_or(DEFAULT_COUNT)
222    }
223
224    pub(crate) fn block_or_default(&self) -> Duration {
225        self.block.unwrap_or(DEFAULT_BLOCK)
226    }
227
228    pub(crate) const fn start(&self) -> &StreamStart {
229        &self.start
230    }
231
232    pub(crate) fn mode(&self) -> ReadMode {
233        self.mode.clone()
234    }
235
236    pub(crate) fn poison_policy(&self) -> PoisonPolicy {
237        PoisonPolicy {
238            dead_letter: self.dead_letter.clone(),
239            max_deliveries: self.max_deliveries,
240        }
241    }
242
243    pub(crate) fn delay_config(&self) -> Option<DelayConfig> {
244        self.delayed_retry.as_ref().map(DelayConfig::from_retry)
245    }
246}
247
248impl SubscriptionSource<RedisBroker> for RedisStream {
249    type Subscriber = RedisSubscriber;
250
251    fn name(&self) -> &str {
252        self.key()
253    }
254
255    async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
256        broker.subscribe(self).await
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn group_is_required() {
266        let err = RedisStream::new("orders").group_or_err().unwrap_err();
267        assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("consumer group")));
268    }
269
270    #[test]
271    fn group_set_resolves() {
272        let s = RedisStream::new("orders").group("workers");
273        assert_eq!(s.group_or_err().expect("group set"), "workers");
274    }
275
276    #[test]
277    fn start_maps_to_redis_ids() {
278        assert_eq!(StreamStart::New.as_id(), "$");
279        assert_eq!(StreamStart::Beginning.as_id(), "0");
280        assert_eq!(StreamStart::Id("5-0".into()).as_id(), "5-0");
281    }
282
283    #[test]
284    fn reclaim_carries_min_idle() {
285        let s = RedisStream::reclaim("orders", Duration::from_secs(30)).group("g");
286        assert!(matches!(s.mode(), ReadMode::Reclaim { min_idle } if min_idle.as_secs() == 30));
287    }
288}