Skip to main content

ruststream_fred/
stream.rs

1//! Builder describing one Redis Streams subscription.
2//!
3//! A subscription always reads through a consumer group. Two read modes are selected by
4//! constructor, never by a runtime flag, because they return disjoint message sets:
5//!
6//! * [`RedisStream::new`] reads fresh entries off the tail (`XREADGROUP > ...`).
7//! * [`RedisStream::reclaim`] reads stale pending entries another consumer never acked
8//!   (`XAUTOCLAIM`, idle at least `min_idle`) - the crash-recovery path.
9//!
10//! Inferring the mode from a numeric parameter would be a footgun (a stray idle timeout could
11//! silently stop fresh delivery), so the mode is part of the constructor name. Recovery is a
12//! separate `reclaim` subscriber on the same group: "two handlers per group".
13
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::Duration;
16
17use ruststream::SubscriptionSource;
18
19use crate::{RedisBroker, error::RedisError, subscriber::RedisSubscriber};
20
21const DEFAULT_COUNT: u64 = 64;
22const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
23
24/// Generates an automatic consumer name when the caller does not set one. Distinct names keep
25/// each in-process subscriber's pending list separate within a shared group.
26fn auto_consumer() -> String {
27    static COUNTER: AtomicU64 = AtomicU64::new(0);
28    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
29    format!("ruststream-{n}")
30}
31
32/// Where a freshly created consumer group starts reading from. Only consulted when the group does
33/// not yet exist; an existing group keeps its own cursor.
34#[derive(Debug, Clone, Default)]
35pub enum StreamStart {
36    /// Only entries added after the group is created (`$`). The default.
37    #[default]
38    New,
39    /// Every entry currently in the stream (`0`).
40    Beginning,
41    /// A specific entry ID, exclusive.
42    Id(String),
43}
44
45impl StreamStart {
46    pub(crate) fn as_id(&self) -> &str {
47        match self {
48            Self::New => "$",
49            Self::Beginning => "0",
50            Self::Id(id) => id,
51        }
52    }
53}
54
55#[derive(Debug, Clone)]
56pub(crate) enum ReadMode {
57    /// `XREADGROUP >` - fresh tail.
58    Fresh,
59    /// `XAUTOCLAIM` of entries idle at least this long.
60    Reclaim { min_idle: Duration },
61}
62
63/// Describes one Redis Streams subscription against [`crate::RedisBroker`].
64///
65/// # Examples
66///
67/// ```
68/// use std::time::Duration;
69/// use ruststream_fred::RedisStream;
70///
71/// // Fresh tail: a normal worker reading new entries.
72/// let fresh = RedisStream::new("orders").group("workers").count(128);
73///
74/// // Recovery: reclaim entries a crashed worker left pending for over 30s.
75/// let recover = RedisStream::reclaim("orders", Duration::from_secs(30)).group("workers");
76/// # let _ = (fresh, recover);
77/// ```
78#[derive(Debug, Clone)]
79#[must_use]
80pub struct RedisStream {
81    key: String,
82    group: Option<String>,
83    consumer: Option<String>,
84    count: Option<u64>,
85    block: Option<Duration>,
86    start: StreamStart,
87    mode: ReadMode,
88}
89
90impl RedisStream {
91    /// A fresh-tail subscription on `key`: reads new entries via `XREADGROUP >`.
92    ///
93    /// A consumer group is required; set it with [`group`](Self::group).
94    pub fn new(key: impl Into<String>) -> Self {
95        Self {
96            key: key.into(),
97            group: None,
98            consumer: None,
99            count: None,
100            block: None,
101            start: StreamStart::New,
102            mode: ReadMode::Fresh,
103        }
104    }
105
106    /// A recovery subscription on `key`: reclaims pending entries idle at least `min_idle` via
107    /// `XAUTOCLAIM`. Run it alongside a [`new`](Self::new) subscriber on the same group to pick up
108    /// messages a consumer fetched but died before acking.
109    ///
110    /// `min_idle` has no default and must exceed the longest legitimate handler runtime: set it too
111    /// low and a healthy consumer's in-flight message gets reclaimed and processed twice.
112    pub fn reclaim(key: impl Into<String>, min_idle: Duration) -> Self {
113        Self {
114            key: key.into(),
115            group: None,
116            consumer: None,
117            count: None,
118            block: None,
119            start: StreamStart::New,
120            mode: ReadMode::Reclaim { min_idle },
121        }
122    }
123
124    /// Sets the consumer group. Required for every subscription.
125    pub fn group(mut self, group: impl Into<String>) -> Self {
126        self.group = Some(group.into());
127        self
128    }
129
130    /// Sets this consumer's name within the group. Defaults to an auto-generated unique name.
131    pub fn consumer(mut self, consumer: impl Into<String>) -> Self {
132        self.consumer = Some(consumer.into());
133        self
134    }
135
136    /// Upper bound on entries fetched per read. Defaults to 64.
137    pub const fn count(mut self, count: u64) -> Self {
138        self.count = Some(count);
139        self
140    }
141
142    /// How long one read blocks waiting for entries. Defaults to 5 seconds. In fresh-tail mode this
143    /// is the `XREADGROUP` server-side block; in reclaim mode `XAUTOCLAIM` does not block, so this is
144    /// the poll interval slept between scans that find nothing to reclaim.
145    pub const fn block(mut self, block: Duration) -> Self {
146        self.block = Some(block);
147        self
148    }
149
150    /// Where a newly created group starts reading. Ignored if the group already exists. Only
151    /// meaningful for the fresh-tail [`new`](Self::new) mode.
152    pub fn start_id(mut self, start: StreamStart) -> Self {
153        self.start = start;
154        self
155    }
156
157    /// The stream key this subscription reads.
158    #[must_use]
159    pub fn key(&self) -> &str {
160        &self.key
161    }
162
163    pub(crate) fn group_or_err(&self) -> Result<&str, RedisError> {
164        self.group.as_deref().ok_or_else(|| {
165            RedisError::InvalidOptions(format!(
166                "stream subscription on `{}` requires a consumer group: call .group(name)",
167                self.key
168            ))
169        })
170    }
171
172    pub(crate) fn consumer_or_auto(&self) -> String {
173        self.consumer.clone().unwrap_or_else(auto_consumer)
174    }
175
176    pub(crate) fn count_or_default(&self) -> u64 {
177        self.count.unwrap_or(DEFAULT_COUNT)
178    }
179
180    pub(crate) fn block_or_default(&self) -> Duration {
181        self.block.unwrap_or(DEFAULT_BLOCK)
182    }
183
184    pub(crate) const fn start(&self) -> &StreamStart {
185        &self.start
186    }
187
188    pub(crate) fn mode(&self) -> ReadMode {
189        self.mode.clone()
190    }
191}
192
193impl SubscriptionSource<RedisBroker> for RedisStream {
194    type Subscriber = RedisSubscriber;
195
196    fn name(&self) -> &str {
197        self.key()
198    }
199
200    async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
201        broker.subscribe(self).await
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn group_is_required() {
211        let err = RedisStream::new("orders").group_or_err().unwrap_err();
212        assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("consumer group")));
213    }
214
215    #[test]
216    fn group_set_resolves() {
217        let s = RedisStream::new("orders").group("workers");
218        assert_eq!(s.group_or_err().expect("group set"), "workers");
219    }
220
221    #[test]
222    fn start_maps_to_redis_ids() {
223        assert_eq!(StreamStart::New.as_id(), "$");
224        assert_eq!(StreamStart::Beginning.as_id(), "0");
225        assert_eq!(StreamStart::Id("5-0".into()).as_id(), "5-0");
226    }
227
228    #[test]
229    fn reclaim_carries_min_idle() {
230        let s = RedisStream::reclaim("orders", Duration::from_secs(30)).group("g");
231        assert!(matches!(s.mode(), ReadMode::Reclaim { min_idle } if min_idle.as_secs() == 30));
232    }
233}