ruststream_fred/stream.rs
1//! Builder describing one Redis Streams subscription.
2//!
3//! A subscription always reads through a consumer group. Two read modes are selected by
4//! constructor, never by a runtime flag, because they return disjoint message sets:
5//!
6//! * [`RedisStream::new`] reads fresh entries off the tail (`XREADGROUP > ...`).
7//! * [`RedisStream::reclaim`] reads stale pending entries another consumer never acked
8//! (`XAUTOCLAIM`, idle at least `min_idle`) - the crash-recovery path.
9//!
10//! Inferring the mode from a numeric parameter would be a footgun (a stray idle timeout could
11//! silently stop fresh delivery), so the mode is part of the constructor name. Recovery is a
12//! separate `reclaim` subscriber on the same group: "two handlers per group".
13
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::Duration;
16
17use ruststream::SubscriptionSource;
18
19use crate::deadletter::PoisonPolicy;
20use crate::delay::{DelayConfig, DelayedRetry};
21use crate::{RedisBroker, error::RedisError, subscriber::RedisSubscriber};
22
23const DEFAULT_COUNT: u64 = 64;
24const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
25
26/// Generates an automatic consumer name when the caller does not set one. Distinct names keep
27/// each in-process subscriber's pending list separate within a shared group.
28fn auto_consumer() -> String {
29 static COUNTER: AtomicU64 = AtomicU64::new(0);
30 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
31 format!("ruststream-{n}")
32}
33
34/// Where a freshly created consumer group starts reading from. Only consulted when the group does
35/// not yet exist; an existing group keeps its own cursor.
36#[derive(Debug, Clone, Default)]
37pub enum StreamStart {
38 /// Only entries added after the group is created (`$`). The default.
39 #[default]
40 New,
41 /// Every entry currently in the stream (`0`).
42 Beginning,
43 /// A specific entry ID, exclusive.
44 Id(String),
45}
46
47impl StreamStart {
48 pub(crate) fn as_id(&self) -> &str {
49 match self {
50 Self::New => "$",
51 Self::Beginning => "0",
52 Self::Id(id) => id,
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
58pub(crate) enum ReadMode {
59 /// `XREADGROUP >` - fresh tail.
60 Fresh,
61 /// `XAUTOCLAIM` of entries idle at least this long.
62 Reclaim { min_idle: Duration },
63}
64
65/// Describes one Redis Streams subscription against [`crate::RedisBroker`].
66///
67/// # Examples
68///
69/// ```
70/// use std::time::Duration;
71/// use ruststream_fred::RedisStream;
72///
73/// // Fresh tail: a normal worker reading new entries.
74/// let fresh = RedisStream::new("orders").group("workers").count(128);
75///
76/// // Recovery: reclaim entries a crashed worker left pending for over 30s.
77/// let recover = RedisStream::reclaim("orders", Duration::from_secs(30)).group("workers");
78/// # let _ = (fresh, recover);
79/// ```
80#[derive(Debug, Clone)]
81#[must_use]
82pub struct RedisStream {
83 key: String,
84 group: Option<String>,
85 consumer: Option<String>,
86 count: Option<u64>,
87 block: Option<Duration>,
88 start: StreamStart,
89 mode: ReadMode,
90 dead_letter: Option<String>,
91 max_deliveries: Option<u64>,
92 delayed_retry: Option<DelayedRetry>,
93}
94
95impl RedisStream {
96 /// A fresh-tail subscription on `key`: reads new entries via `XREADGROUP >`.
97 ///
98 /// A consumer group is required; set it with [`group`](Self::group).
99 pub fn new(key: impl Into<String>) -> Self {
100 Self {
101 key: key.into(),
102 group: None,
103 consumer: None,
104 count: None,
105 block: None,
106 start: StreamStart::New,
107 mode: ReadMode::Fresh,
108 dead_letter: None,
109 max_deliveries: None,
110 delayed_retry: None,
111 }
112 }
113
114 /// A recovery subscription on `key`: reclaims pending entries idle at least `min_idle` via
115 /// `XAUTOCLAIM`. Run it alongside a [`new`](Self::new) subscriber on the same group to pick up
116 /// messages a consumer fetched but died before acking.
117 ///
118 /// `min_idle` has no default and must exceed the longest legitimate handler runtime: set it too
119 /// low and a healthy consumer's in-flight message gets reclaimed and processed twice.
120 pub fn reclaim(key: impl Into<String>, min_idle: Duration) -> Self {
121 Self {
122 key: key.into(),
123 group: None,
124 consumer: None,
125 count: None,
126 block: None,
127 start: StreamStart::New,
128 mode: ReadMode::Reclaim { min_idle },
129 dead_letter: None,
130 max_deliveries: None,
131 delayed_retry: None,
132 }
133 }
134
135 /// Sets the consumer group. Required for every subscription.
136 pub fn group(mut self, group: impl Into<String>) -> Self {
137 self.group = Some(group.into());
138 self
139 }
140
141 /// Sets this consumer's name within the group. Defaults to an auto-generated unique name.
142 pub fn consumer(mut self, consumer: impl Into<String>) -> Self {
143 self.consumer = Some(consumer.into());
144 self
145 }
146
147 /// Upper bound on entries fetched per read. Defaults to 64.
148 pub const fn count(mut self, count: u64) -> Self {
149 self.count = Some(count);
150 self
151 }
152
153 /// How long one read blocks waiting for entries. Defaults to 5 seconds. In fresh-tail mode this
154 /// is the `XREADGROUP` server-side block; in reclaim mode `XAUTOCLAIM` does not block, so this is
155 /// the poll interval slept between scans that find nothing to reclaim.
156 pub const fn block(mut self, block: Duration) -> Self {
157 self.block = Some(block);
158 self
159 }
160
161 /// Where a newly created group starts reading. Ignored if the group already exists. Only
162 /// meaningful for the fresh-tail [`new`](Self::new) mode.
163 pub fn start_id(mut self, start: StreamStart) -> Self {
164 self.start = start;
165 self
166 }
167
168 /// Routes dropped and poison messages to the named dead-letter stream instead of discarding
169 /// them. Off by default. The copy is tagged with
170 /// [`DEAD_LETTER_REASON_HEADER`](crate::DEAD_LETTER_REASON_HEADER). See [`crate::deadletter`].
171 pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
172 self.dead_letter = Some(key.into());
173 self
174 }
175
176 /// Caps how many times a message may be delivered before it is treated as poison (dead-lettered
177 /// or, with no dead-letter stream, discarded). Off by default.
178 ///
179 /// The cap is checked against both the framework retry-count header (the `nack`/republish loop)
180 /// and the native stream delivery count (the reclaim loop), so a message poisoning either way is
181 /// caught.
182 pub const fn max_deliveries(mut self, max: u64) -> Self {
183 self.max_deliveries = Some(max);
184 self
185 }
186
187 /// Opts this subscription into durable, crash-safe delayed retry backed by a ZSET delay queue.
188 ///
189 /// Off by default: without it, `retry_after(delay)` / `nack_after(delay)` degrade to the
190 /// runtime's broker-agnostic deferred re-publish (at-most-once over the delay window). With it,
191 /// a delayed delivery is `ZADD`ed to the named ZSET and replayed from there once due, so the
192 /// retry survives a process crash. See [`DelayedRetry`] for the key and TTL requirements.
193 ///
194 /// The sweeper that replays due entries runs inside this subscription's read loop, so its
195 /// granularity is the read [`block`](Self::block) interval.
196 pub fn delayed_retry(mut self, retry: DelayedRetry) -> Self {
197 self.delayed_retry = Some(retry);
198 self
199 }
200
201 /// The stream key this subscription reads.
202 #[must_use]
203 pub fn key(&self) -> &str {
204 &self.key
205 }
206
207 pub(crate) fn group_or_err(&self) -> Result<&str, RedisError> {
208 self.group.as_deref().ok_or_else(|| {
209 RedisError::InvalidOptions(format!(
210 "stream subscription on `{}` requires a consumer group: call .group(name)",
211 self.key
212 ))
213 })
214 }
215
216 pub(crate) fn consumer_or_auto(&self) -> String {
217 self.consumer.clone().unwrap_or_else(auto_consumer)
218 }
219
220 pub(crate) fn count_or_default(&self) -> u64 {
221 self.count.unwrap_or(DEFAULT_COUNT)
222 }
223
224 pub(crate) fn block_or_default(&self) -> Duration {
225 self.block.unwrap_or(DEFAULT_BLOCK)
226 }
227
228 pub(crate) const fn start(&self) -> &StreamStart {
229 &self.start
230 }
231
232 pub(crate) fn mode(&self) -> ReadMode {
233 self.mode.clone()
234 }
235
236 pub(crate) fn poison_policy(&self) -> PoisonPolicy {
237 PoisonPolicy {
238 dead_letter: self.dead_letter.clone(),
239 max_deliveries: self.max_deliveries,
240 }
241 }
242
243 pub(crate) fn delay_config(&self) -> Option<DelayConfig> {
244 self.delayed_retry.as_ref().map(DelayConfig::from_retry)
245 }
246}
247
248impl SubscriptionSource<RedisBroker> for RedisStream {
249 type Subscriber = RedisSubscriber;
250
251 fn name(&self) -> &str {
252 self.key()
253 }
254
255 async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
256 broker.subscribe(self).await
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn group_is_required() {
266 let err = RedisStream::new("orders").group_or_err().unwrap_err();
267 assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("consumer group")));
268 }
269
270 #[test]
271 fn group_set_resolves() {
272 let s = RedisStream::new("orders").group("workers");
273 assert_eq!(s.group_or_err().expect("group set"), "workers");
274 }
275
276 #[test]
277 fn start_maps_to_redis_ids() {
278 assert_eq!(StreamStart::New.as_id(), "$");
279 assert_eq!(StreamStart::Beginning.as_id(), "0");
280 assert_eq!(StreamStart::Id("5-0".into()).as_id(), "5-0");
281 }
282
283 #[test]
284 fn reclaim_carries_min_idle() {
285 let s = RedisStream::reclaim("orders", Duration::from_secs(30)).group("g");
286 assert!(matches!(s.mode(), ReadMode::Reclaim { min_idle } if min_idle.as_secs() == 30));
287 }
288}