Skip to main content

ruststream_fred/
list.rs

1//! Redis list transport: a competing-consumers work queue.
2//!
3//! A producer `LPUSH`es onto the list; consumers pop from the right (`BRPOP`), so delivery is FIFO
4//! and each entry goes to exactly one consumer (no fan-out, no replay, no groups). Two modes:
5//!
6//! * Simple (default) - `BRPOP`, at-most-once. `ack` / `nack` report [`AckError::Unsupported`]: once
7//!   popped, the entry is gone, so a crash mid-handler loses it.
8//! * Reliable ([`RedisList::reliable`]) - `LMOVE` the entry to a per-consumer processing list, then
9//!   `LREM` it on `ack` (at-least-once). `nack(requeue = true)` returns it to the main list;
10//!   `nack(requeue = false)` removes it.
11//!
12//! Reliable mode has no native idle/pending tracking, so a consumer that dies after `LMOVE` but
13//! before settling leaves its entry stranded on the processing list. Recovering those (a ZSET
14//! watchdog) is not implemented in 0.4; the durable, recoverable path is Redis Streams
15//! ([`crate::RedisStream`]).
16//!
17//! Headers travel in a frame around the payload (see [`crate::envelope`]): a lossless binary frame
18//! by default, or a readable codec-serialized envelope when a codec is set with
19//! [`RedisList::codec`] / [`RedisListPublisher::codec`].
20
21use std::fmt::{Debug, Formatter};
22use std::sync::Arc;
23use std::time::Duration;
24
25use bytes::Bytes;
26use fred::clients::Pool;
27use fred::interfaces::ListInterface;
28use fred::types::lists::LMoveDirection;
29use futures::Stream;
30use futures::stream::unfold;
31use ruststream::codec::Codec;
32use ruststream::{AckError, Headers, IncomingMessage, Partitioned, SubscriptionSource};
33
34use crate::envelope::{SharedEnvelope, frame, unframe};
35use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
36
37const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
38/// Suffix appended to the list key to form the default per-consumer processing list (reliable mode).
39const PROCESSING_SUFFIX: &str = ".processing";
40
41fn block_secs(block: Duration) -> f64 {
42    block.as_secs_f64()
43}
44
45/// Describes one list subscription against [`crate::RedisBroker`].
46///
47/// # Examples
48///
49/// ```
50/// use std::time::Duration;
51/// use ruststream_fred::RedisList;
52///
53/// let simple = RedisList::new("jobs");
54/// let reliable = RedisList::new("jobs").reliable().block(Duration::from_secs(2));
55/// # let _ = (simple, reliable);
56/// ```
57#[derive(Clone)]
58#[must_use]
59pub struct RedisList {
60    key: String,
61    reliable: bool,
62    processing: Option<String>,
63    block: Option<Duration>,
64    codec: Option<SharedEnvelope>,
65}
66
67impl Debug for RedisList {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("RedisList")
70            .field("key", &self.key)
71            .field("reliable", &self.reliable)
72            .field("processing", &self.processing)
73            .field("codec", &self.codec.is_some())
74            .finish_non_exhaustive()
75    }
76}
77
78impl RedisList {
79    /// A simple (at-most-once) `BRPOP` work-queue consumer on `key`.
80    pub fn new(key: impl Into<String>) -> Self {
81        Self {
82            key: key.into(),
83            reliable: false,
84            processing: None,
85            block: None,
86            codec: None,
87        }
88    }
89
90    /// Switches to reliable (at-least-once) mode: entries move to a processing list and are removed
91    /// on `ack`.
92    pub const fn reliable(mut self) -> Self {
93        self.reliable = true;
94        self
95    }
96
97    /// Sets the processing-list key used in reliable mode. Defaults to `<key>.processing`.
98    pub fn processing(mut self, key: impl Into<String>) -> Self {
99        self.processing = Some(key.into());
100        self
101    }
102
103    /// How long one blocking pop waits before looping. Defaults to 5 seconds.
104    pub const fn block(mut self, block: Duration) -> Self {
105        self.block = Some(block);
106        self
107    }
108
109    /// Decodes the header/payload envelope with `codec` (must match the publisher). Without it the
110    /// default lossless binary framing is used.
111    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
112        self.codec = Some(Arc::new(codec));
113        self
114    }
115
116    /// The list key this subscription consumes.
117    #[must_use]
118    pub fn key(&self) -> &str {
119        &self.key
120    }
121
122    pub(crate) const fn is_reliable(&self) -> bool {
123        self.reliable
124    }
125
126    pub(crate) fn processing_or_default(&self) -> String {
127        self.processing
128            .clone()
129            .unwrap_or_else(|| format!("{}{PROCESSING_SUFFIX}", self.key))
130    }
131
132    pub(crate) fn block_or_default(&self) -> Duration {
133        self.block.unwrap_or(DEFAULT_BLOCK)
134    }
135
136    pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
137        self.codec.clone()
138    }
139}
140
141impl SubscriptionSource<RedisBroker> for RedisList {
142    type Subscriber = RedisListSubscriber;
143
144    fn name(&self) -> &str {
145        self.key()
146    }
147
148    async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
149        broker.subscribe_list(self).await
150    }
151}
152
153/// A list-backed work-queue subscription.
154pub struct RedisListSubscriber {
155    pool: Pool,
156    key: String,
157    reliable: bool,
158    processing: String,
159    block: Duration,
160    codec: Option<SharedEnvelope>,
161}
162
163impl Debug for RedisListSubscriber {
164    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct("RedisListSubscriber")
166            .field("key", &self.key)
167            .field("reliable", &self.reliable)
168            .finish_non_exhaustive()
169    }
170}
171
172impl RedisListSubscriber {
173    pub(crate) fn new(
174        pool: Pool,
175        key: String,
176        reliable: bool,
177        processing: String,
178        block: Duration,
179        codec: Option<SharedEnvelope>,
180    ) -> Self {
181        Self {
182            pool,
183            key,
184            reliable,
185            processing,
186            block,
187            codec,
188        }
189    }
190
191    fn simple_message(&self, raw: &[u8]) -> RedisListMessage {
192        let (payload, headers) = unframe(self.codec.as_ref(), raw);
193        RedisListMessage {
194            payload,
195            headers,
196            ack: None,
197        }
198    }
199
200    fn reliable_message(&self, raw: Vec<u8>) -> RedisListMessage {
201        let (payload, headers) = unframe(self.codec.as_ref(), &raw);
202        RedisListMessage {
203            payload,
204            headers,
205            ack: Some(ListAck {
206                pool: self.pool.clone(),
207                main_key: self.key.clone(),
208                processing_key: self.processing.clone(),
209                value: raw,
210            }),
211        }
212    }
213
214    /// Blocks for the next entry, returning `None` when the pop times out (the caller loops).
215    async fn next_entry(&self) -> Result<Option<RedisListMessage>, RedisError> {
216        let secs = block_secs(self.block);
217        if self.reliable {
218            let value: Option<Vec<u8>> = self
219                .pool
220                .blmove(
221                    self.key.as_str(),
222                    self.processing.as_str(),
223                    LMoveDirection::Right,
224                    LMoveDirection::Left,
225                    secs,
226                )
227                .await
228                .map_err(RedisError::stream)?;
229            Ok(value.map(|v| self.reliable_message(v)))
230        } else {
231            let popped: Option<(String, Vec<u8>)> = self
232                .pool
233                .brpop(self.key.as_str(), secs)
234                .await
235                .map_err(RedisError::stream)?;
236            Ok(popped.map(|(_, v)| self.simple_message(&v)))
237        }
238    }
239}
240
241impl ruststream::Subscriber for RedisListSubscriber {
242    type Message = RedisListMessage;
243    type Error = RedisError;
244
245    /// Yields one message per popped entry.
246    ///
247    /// # Cancel safety
248    ///
249    /// Dropping the returned stream between items is safe. In reliable mode an entry already moved
250    /// to the processing list but not yet settled stays there until acked or recovered manually.
251    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
252        unfold(&*self, |s| async move {
253            loop {
254                match s.next_entry().await {
255                    Ok(Some(msg)) => return Some((Ok(msg), s)),
256                    Ok(None) => {}
257                    Err(err) => return Some((Err(err), s)),
258                }
259            }
260        })
261    }
262}
263
264/// Settlement handle for a reliable-mode list delivery.
265struct ListAck {
266    pool: Pool,
267    main_key: String,
268    processing_key: String,
269    /// The raw wire value (framed), needed verbatim to `LREM` it from the processing list.
270    value: Vec<u8>,
271}
272
273/// A list-queue delivery. In simple mode `ack` / `nack` are unsupported; in reliable mode `ack`
274/// removes the entry from the processing list and `nack` either returns it or drops it.
275pub struct RedisListMessage {
276    payload: Bytes,
277    headers: Headers,
278    ack: Option<ListAck>,
279}
280
281impl Debug for RedisListMessage {
282    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
283        f.debug_struct("RedisListMessage")
284            .field("payload_len", &self.payload.len())
285            .field("reliable", &self.ack.is_some())
286            .finish_non_exhaustive()
287    }
288}
289
290impl IncomingMessage for RedisListMessage {
291    fn payload(&self) -> &[u8] {
292        &self.payload
293    }
294
295    fn headers(&self) -> &Headers {
296        &self.headers
297    }
298
299    async fn ack(self) -> Result<(), AckError> {
300        let Some(handle) = self.ack else {
301            return Err(AckError::Unsupported);
302        };
303        lrem(&handle).await
304    }
305
306    async fn nack(self, requeue: bool) -> Result<(), AckError> {
307        let Some(handle) = self.ack else {
308            return Err(AckError::Unsupported);
309        };
310        if requeue {
311            // Return the entry to the head of the main list before removing it from processing, so a
312            // crash in between leaves a duplicate rather than losing it.
313            let _: i64 = handle
314                .pool
315                .lpush(handle.main_key.as_str(), handle.value.clone())
316                .await
317                .map_err(|err| AckError::Broker(Box::new(err)))?;
318        }
319        lrem(&handle).await
320    }
321}
322
323async fn lrem(handle: &ListAck) -> Result<(), AckError> {
324    let _: i64 = handle
325        .pool
326        .lrem(handle.processing_key.as_str(), 1, handle.value.clone())
327        .await
328        .map_err(|err| AckError::Broker(Box::new(err)))?;
329    Ok(())
330}
331
332impl Partitioned for RedisListMessage {
333    fn partition_key(&self) -> Option<&[u8]> {
334        self.headers().get(PARTITION_KEY_HEADER)
335    }
336}
337
338/// Publishes onto a list with `LPUSH`, so right-popping consumers see FIFO order.
339///
340/// Obtain it from [`RedisBroker::list_publisher`](crate::RedisBroker::list_publisher). Headers are
341/// framed around the payload; set a [`codec`](Self::codec) for a readable wire format (it must match
342/// the subscriber's).
343#[derive(Clone)]
344pub struct RedisListPublisher {
345    pool: Arc<tokio::sync::OnceCell<Pool>>,
346    codec: Option<SharedEnvelope>,
347}
348
349impl Debug for RedisListPublisher {
350    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
351        f.debug_struct("RedisListPublisher")
352            .field("codec", &self.codec.is_some())
353            .finish_non_exhaustive()
354    }
355}
356
357impl RedisListPublisher {
358    pub(crate) fn new(pool: Arc<tokio::sync::OnceCell<Pool>>) -> Self {
359        Self { pool, codec: None }
360    }
361
362    /// Serializes the header/payload envelope with `codec` (must match the subscriber). Without it
363    /// the default lossless binary framing is used.
364    #[must_use]
365    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
366        self.codec = Some(Arc::new(codec));
367        self
368    }
369}
370
371impl ruststream::Publisher for RedisListPublisher {
372    type Error = RedisError;
373
374    async fn publish(&self, msg: ruststream::OutgoingMessage<'_>) -> Result<(), Self::Error> {
375        let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
376        let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
377        let _: i64 = pool
378            .lpush(msg.name(), body)
379            .await
380            .map_err(RedisError::publish)?;
381        Ok(())
382    }
383}