Skip to main content

ruststream_fred/
subscriber.rs

1//! Redis Streams subscriber driving `XREADGROUP` (fresh tail) or `XAUTOCLAIM` (reclaim).
2
3use std::collections::{HashMap, VecDeque};
4use std::fmt::{Debug, Formatter};
5use std::time::Duration;
6
7use fred::clients::Pool;
8use fred::interfaces::StreamsInterface;
9use fred::types::streams::XReadValue;
10use futures::Stream;
11use futures::stream::unfold;
12use ruststream::{BatchSubscriber, Subscriber};
13
14use crate::convert::{HEADER_PREFIX, parts_from_fields};
15use crate::deadletter::{
16    self, DELIVERY_COUNT_HEADER, IDLE_MS_HEADER, PoisonPolicy, REASON_MAX_DELIVERIES,
17};
18use crate::delay::{self, DelayConfig};
19use crate::{error::RedisError, message::RedisMessage, stream::ReadMode};
20
21/// One decoded stream entry: its ID and field map.
22type Entry = (String, HashMap<String, Vec<u8>>);
23
24/// `XREADGROUP` reply shape parsed as nested arrays rather than maps: the RESP2 reply is an array of
25/// `[key, [[id, [field, value, ...]], ...]]`, which does not convert to fred's map-based
26/// `XReadResponse` (the outer array is not a flat key/value list). Pairing into tuples does work, so
27/// we collect the entry fields into a map ourselves.
28type RawStreams = Vec<(String, Vec<(String, Vec<(String, Vec<u8>)>)>)>;
29
30/// Cursor a fresh reclaim scan starts from (the whole pending list).
31const RECLAIM_START: &str = "0-0";
32
33fn duration_to_millis(d: Duration) -> u64 {
34    u64::try_from(d.as_millis()).unwrap_or(u64::MAX)
35}
36
37/// A Redis Streams subscription bound to a consumer group.
38///
39/// Constructed by [`crate::RedisBroker::subscribe`] from a [`crate::RedisStream`] descriptor. The
40/// read mode (fresh tail vs reclaim) is fixed at construction.
41pub struct RedisSubscriber {
42    pool: Pool,
43    key: String,
44    group: String,
45    consumer: String,
46    count: u64,
47    block: Duration,
48    mode: ReadMode,
49    policy: PoisonPolicy,
50    /// Set when the subscription opted into a durable ZSET delay queue; drives native `nack_after`
51    /// on each delivery and the due-entry sweep on each fetch.
52    delay: Option<DelayConfig>,
53    /// Reclaim cursor; advances across `XAUTOCLAIM` calls, unused in fresh mode.
54    cursor: String,
55    /// Entries fetched but not yet yielded.
56    buffer: VecDeque<Entry>,
57}
58
59impl Debug for RedisSubscriber {
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("RedisSubscriber")
62            .field("key", &self.key)
63            .field("group", &self.group)
64            .field("consumer", &self.consumer)
65            .field("mode", &self.mode)
66            .finish_non_exhaustive()
67    }
68}
69
70impl RedisSubscriber {
71    #[allow(
72        clippy::too_many_arguments,
73        reason = "internal constructor mirroring the descriptor"
74    )]
75    pub(crate) fn new(
76        pool: Pool,
77        key: String,
78        group: String,
79        consumer: String,
80        count: u64,
81        block: Duration,
82        mode: ReadMode,
83        policy: PoisonPolicy,
84        delay: Option<DelayConfig>,
85    ) -> Self {
86        Self {
87            pool,
88            key,
89            group,
90            consumer,
91            count,
92            block,
93            mode,
94            policy,
95            delay,
96            cursor: RECLAIM_START.to_owned(),
97            buffer: VecDeque::new(),
98        }
99    }
100
101    fn message(&self, id: String, fields: HashMap<String, Vec<u8>>) -> RedisMessage {
102        let (payload, headers) = parts_from_fields(fields);
103        RedisMessage::new(
104            self.pool.clone(),
105            self.key.clone(),
106            self.group.clone(),
107            id,
108            payload,
109            headers,
110            self.policy.clone(),
111            self.delay.clone(),
112        )
113    }
114
115    /// Fetches the next batch of entries into the buffer. A read that timed out with nothing
116    /// pending leaves the buffer empty (the caller loops and reads again).
117    async fn fetch(&mut self) -> Result<(), RedisError> {
118        // Replay any due delayed-retry entries before reading, so they re-enter the stream and get
119        // delivered through the normal read path. Granularity is the read block interval.
120        if let Some(cfg) = &self.delay {
121            delay::sweep_due(&self.pool, cfg, &self.key).await?;
122        }
123        let entries = match self.mode.clone() {
124            ReadMode::Fresh => self.fetch_fresh().await?,
125            ReadMode::Reclaim { min_idle } => self.fetch_reclaim(min_idle).await?,
126        };
127        self.buffer.extend(entries);
128        Ok(())
129    }
130
131    async fn fetch_fresh(&self) -> Result<Vec<Entry>, RedisError> {
132        let resp: RawStreams = self
133            .pool
134            .xreadgroup(
135                self.group.as_str(),
136                self.consumer.as_str(),
137                Some(self.count),
138                Some(duration_to_millis(self.block)),
139                false,
140                self.key.as_str(),
141                ">",
142            )
143            .await
144            .map_err(RedisError::stream)?;
145        let entries = resp
146            .into_iter()
147            .find(|(key, _)| key == &self.key)
148            .map(|(_, entries)| entries)
149            .unwrap_or_default();
150        Ok(entries
151            .into_iter()
152            .map(|(id, fields)| (id, fields.into_iter().collect()))
153            .collect())
154    }
155
156    async fn fetch_reclaim(&mut self, min_idle: Duration) -> Result<Vec<Entry>, RedisError> {
157        let (cursor, entries): (String, Vec<XReadValue<String, String, Vec<u8>>>) = self
158            .pool
159            .xautoclaim_values(
160                self.key.as_str(),
161                self.group.as_str(),
162                self.consumer.as_str(),
163                duration_to_millis(min_idle),
164                self.cursor.as_str(),
165                Some(self.count),
166                false,
167            )
168            .await
169            .map_err(RedisError::stream)?;
170        self.cursor = cursor;
171        // Nothing left to reclaim this pass: avoid a hot loop until more entries go stale.
172        if entries.is_empty() {
173            tokio::time::sleep(self.block).await;
174            return Ok(entries);
175        }
176        // Plain reclaim with no poison policy: skip the extra XPENDING and deliver as-is.
177        if !self.policy.is_active() {
178            return Ok(entries);
179        }
180        self.enrich_reclaimed(entries).await
181    }
182
183    /// Annotates reclaimed entries with their native delivery count and idle time, and dead-letters
184    /// (or drops) any that have exceeded `max_deliveries` instead of redelivering them.
185    async fn enrich_reclaimed(&self, entries: Vec<Entry>) -> Result<Vec<Entry>, RedisError> {
186        let meta = self.pending_meta().await?;
187        let mut out = Vec::with_capacity(entries.len());
188        for (id, mut fields) in entries {
189            let (idle, count) = meta.get(&id).copied().unwrap_or((0, 0));
190            if self.policy.is_poison(count) {
191                self.dead_letter_reclaimed(&id, &fields).await?;
192                continue;
193            }
194            insert_meta_header(&mut fields, DELIVERY_COUNT_HEADER, count);
195            insert_meta_header(&mut fields, IDLE_MS_HEADER, idle);
196            out.push((id, fields));
197        }
198        Ok(out)
199    }
200
201    /// Maps each of this consumer's pending entry IDs to its `(idle_ms, delivery_count)` via
202    /// extended `XPENDING`, which - unlike `XAUTOCLAIM` - reports the native delivery count.
203    async fn pending_meta(&self) -> Result<HashMap<String, (u64, u64)>, RedisError> {
204        let rows: Vec<(String, String, u64, u64)> = self
205            .pool
206            .xpending(
207                self.key.as_str(),
208                self.group.as_str(),
209                (0_u64, "-", "+", self.count, self.consumer.as_str()),
210            )
211            .await
212            .map_err(RedisError::stream)?;
213        Ok(rows
214            .into_iter()
215            .map(|(id, _consumer, idle, count)| (id, (idle, count)))
216            .collect())
217    }
218
219    /// Routes a poison reclaimed entry to its dead-letter stream (or discards it when none is set),
220    /// then `XACK`s it so it leaves the pending list.
221    async fn dead_letter_reclaimed(
222        &self,
223        id: &str,
224        fields: &HashMap<String, Vec<u8>>,
225    ) -> Result<(), RedisError> {
226        let (payload, headers) = parts_from_fields(fields.clone());
227        deadletter::settle_poison_stream(
228            &self.pool,
229            &self.policy,
230            &payload,
231            &headers,
232            REASON_MAX_DELIVERIES,
233        )
234        .await
235        .map_err(RedisError::stream)?;
236        let _: i64 = self
237            .pool
238            .xack(self.key.as_str(), self.group.as_str(), id)
239            .await
240            .map_err(RedisError::stream)?;
241        Ok(())
242    }
243}
244
245/// Injects a `u64`-valued well-known header into an entry's raw field map (under the `h:` prefix),
246/// so it surfaces as a [`Headers`](ruststream::Headers) entry on the delivered message.
247fn insert_meta_header(fields: &mut HashMap<String, Vec<u8>>, name: &str, value: u64) {
248    fields.insert(
249        format!("{HEADER_PREFIX}{name}"),
250        value.to_string().into_bytes(),
251    );
252}
253
254impl Subscriber for RedisSubscriber {
255    type Message = RedisMessage;
256    type Error = RedisError;
257
258    /// Yields one message per entry, refilling from Redis when the local buffer drains.
259    ///
260    /// # Cancel safety
261    ///
262    /// Dropping the returned stream between items is safe. Dropping it while a read is in flight
263    /// drops the read future; entries already delivered to this consumer but not yet acked stay in
264    /// the group's pending list and are redelivered (fresh mode) or reclaimable (reclaim mode).
265    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
266        unfold(self, |s| async move {
267            loop {
268                if let Some((id, fields)) = s.buffer.pop_front() {
269                    return Some((Ok(s.message(id, fields)), s));
270                }
271                // An empty fetch (a blocking read that timed out) just loops and reads again.
272                if let Err(err) = s.fetch().await {
273                    return Some((Err(err), s));
274                }
275            }
276        })
277    }
278}
279
280impl BatchSubscriber for RedisSubscriber {
281    type Batch = Vec<RedisMessage>;
282
283    /// Yields one batch per non-empty read (`XREADGROUP COUNT` / `XAUTOCLAIM`), up to
284    /// [`RedisStream::count`](crate::RedisStream::count) entries. Never yields an empty batch.
285    ///
286    /// # Cancel safety
287    ///
288    /// Same as [`Subscriber::stream`]: dropping the stream mid-read leaves fetched-but-unacked
289    /// entries in the pending list.
290    fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
291        unfold(self, |s| async move {
292            loop {
293                if !s.buffer.is_empty() {
294                    // Move the buffer out first so `s.message` can borrow `s` without overlapping
295                    // a live mutable borrow of `s.buffer`.
296                    let entries = std::mem::take(&mut s.buffer);
297                    let batch = entries
298                        .into_iter()
299                        .map(|(id, fields)| s.message(id, fields))
300                        .collect::<Vec<_>>();
301                    return Some((Ok(batch), s));
302                }
303                if let Err(err) = s.fetch().await {
304                    return Some((Err(err), s));
305                }
306            }
307        })
308    }
309}