Skip to main content

ruststream_fred/
message.rs

1//! Delivered-message wrapper that implements [`IncomingMessage`].
2
3use std::fmt::{Debug, Formatter};
4use std::time::Duration;
5
6use bytes::Bytes;
7use fred::clients::Pool;
8use fred::interfaces::StreamsInterface;
9use ruststream::runtime::RETRY_COUNT_HEADER;
10use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
11
12use crate::convert::fields_for_publish;
13use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
14use crate::delay::{self, DelayConfig};
15
16/// The well-known header key for per-message routing / partitioning.
17///
18/// Set this header on outgoing messages to control key-based fan-out when the runtime is
19/// configured with `workers(N, by_key)`. The value is opaque bytes; the runtime hashes it to
20/// assign a dispatch lane. Redis has no native partition concept on a single stream, so the key
21/// travels as this header value and the sender is responsible for setting it.
22pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
23
24/// Everything a [`RedisMessage`] needs to settle itself against the stream it came from.
25struct AckHandle {
26    pool: Pool,
27    key: String,
28    group: String,
29    id: String,
30}
31
32/// A Redis Streams delivery, read from a consumer group via `XREADGROUP` or `XAUTOCLAIM`.
33///
34/// Settlement follows the republish-retry model: `ack` is `XACK`; `nack(requeue = true)`
35/// re-appends a copy of the entry to the same stream and then acks the original (at-least-once,
36/// so a duplicate is possible if the process crashes between the two); `nack(requeue = false)`
37/// acks the original to drop it.
38pub struct RedisMessage {
39    payload: Bytes,
40    headers: Headers,
41    ack: Option<AckHandle>,
42    policy: PoisonPolicy,
43    /// Set when the subscription opted into a durable ZSET delay queue; makes `nack_after` native.
44    delay: Option<DelayConfig>,
45}
46
47impl Debug for RedisMessage {
48    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49        let mut s = f.debug_struct("RedisMessage");
50        s.field("payload_len", &self.payload.len());
51        if let Some(ack) = &self.ack {
52            s.field("key", &ack.key).field("id", &ack.id);
53        }
54        s.finish_non_exhaustive()
55    }
56}
57
58impl RedisMessage {
59    #[allow(
60        clippy::too_many_arguments,
61        reason = "internal constructor mirroring the descriptor"
62    )]
63    pub(crate) fn new(
64        pool: Pool,
65        key: String,
66        group: String,
67        id: String,
68        payload: Bytes,
69        headers: Headers,
70        policy: PoisonPolicy,
71        delay: Option<DelayConfig>,
72    ) -> Self {
73        Self {
74            payload,
75            headers,
76            ack: Some(AckHandle {
77                pool,
78                key,
79                group,
80                id,
81            }),
82            policy,
83            delay,
84        }
85    }
86
87    /// The stream entry ID (for example `1700000000000-0`) this message was read at.
88    #[must_use]
89    pub fn id(&self) -> Option<&str> {
90        self.ack.as_ref().map(|a| a.id.as_str())
91    }
92
93    /// The consumer group this delivery was read through, or `None` once the message has settled.
94    #[must_use]
95    pub fn group(&self) -> Option<&str> {
96        self.ack.as_ref().map(|a| a.group.as_str())
97    }
98}
99
100impl Partitioned for RedisMessage {
101    fn partition_key(&self) -> Option<&[u8]> {
102        self.headers().get(PARTITION_KEY_HEADER)
103    }
104}
105
106impl IncomingMessage for RedisMessage {
107    fn payload(&self) -> &[u8] {
108        &self.payload
109    }
110
111    fn headers(&self) -> &Headers {
112        &self.headers
113    }
114
115    async fn ack(mut self) -> Result<(), AckError> {
116        let handle = self.ack.take().expect("RedisMessage settled twice");
117        xack(&handle).await
118    }
119
120    async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
121        let handle = self.ack.take().expect("RedisMessage settled twice");
122        if requeue {
123            if self.policy.is_active() {
124                let next = next_retry_count(&self.headers);
125                if self.policy.is_poison(next) {
126                    // The framework retry-count reached the cap: dead-letter (or discard) instead
127                    // of redelivering, then ack the original.
128                    deadletter::settle_poison_stream(
129                        &handle.pool,
130                        &self.policy,
131                        &self.payload,
132                        &self.headers,
133                        REASON_MAX_DELIVERIES,
134                    )
135                    .await
136                    .map_err(broker_err)?;
137                } else {
138                    let mut headers = self.headers.clone();
139                    headers.insert(RETRY_COUNT_HEADER, next.to_string());
140                    republish(&handle, &self.payload, &headers).await?;
141                }
142            } else {
143                // No poison policy: republish verbatim, the plain at-least-once retry.
144                republish(&handle, &self.payload, &self.headers).await?;
145            }
146        } else if self.policy.is_active() {
147            // Drop: dead-letter it (or discard when no dead-letter stream is set) before acking.
148            deadletter::settle_poison_stream(
149                &handle.pool,
150                &self.policy,
151                &self.payload,
152                &self.headers,
153                REASON_DROPPED,
154            )
155            .await
156            .map_err(broker_err)?;
157        }
158        xack(&handle).await
159    }
160
161    /// Native delayed redelivery is available only when the subscription opted into a durable ZSET
162    /// delay queue with [`RedisStream::delayed_retry`](crate::RedisStream::delayed_retry); otherwise
163    /// the runtime applies its broker-agnostic deferred-republish fallback.
164    fn supports_nack_after(&self) -> bool {
165        self.delay.is_some()
166    }
167
168    /// Schedules the message for redelivery no sooner than `delay` from now via the configured ZSET
169    /// delay queue (`ZADD` the delayed copy, then `XACK` the original), with the retry-count header
170    /// incremented. The subscriber's sweeper re-`XADD`s it to the source stream once due.
171    ///
172    /// # Errors
173    ///
174    /// Returns [`AckError::Unsupported`] when the subscription did not opt into a delay queue, or
175    /// [`AckError::Broker`] when the `ZADD` or `XACK` fails.
176    async fn nack_after(mut self, delay: Duration) -> Result<(), AckError> {
177        let handle = self.ack.take().expect("RedisMessage settled twice");
178        let Some(cfg) = self.delay.as_ref() else {
179            return Err(AckError::Unsupported);
180        };
181        // ZADD the delayed copy before XACK-ing the original, so a crash in between leaves a
182        // duplicate (the scheduled copy plus the still-pending original) rather than a loss.
183        delay::schedule(
184            &handle.pool,
185            cfg,
186            &handle.id,
187            &self.payload,
188            &self.headers,
189            delay,
190        )
191        .await?;
192        xack(&handle).await
193    }
194}
195
196/// The next framework retry-count value (the current header plus one, or one when absent).
197fn next_retry_count(headers: &Headers) -> u64 {
198    headers
199        .get_str(RETRY_COUNT_HEADER)
200        .and_then(|v| v.parse::<u64>().ok())
201        .unwrap_or(0)
202        + 1
203}
204
205fn broker_err(err: fred::error::Error) -> AckError {
206    AckError::Broker(Box::new(err))
207}
208
209/// Re-appends a copy of the message to the tail of its stream (the at-least-once retry). Runs before
210/// the caller's `XACK` so a crash leaves a duplicate rather than a loss.
211async fn republish(handle: &AckHandle, payload: &[u8], headers: &Headers) -> Result<(), AckError> {
212    let fields = fields_for_publish(payload, headers);
213    let _: String = handle
214        .pool
215        .xadd(handle.key.as_str(), false, None::<()>, "*", fields)
216        .await
217        .map_err(broker_err)?;
218    Ok(())
219}
220
221async fn xack(handle: &AckHandle) -> Result<(), AckError> {
222    let _: i64 = handle
223        .pool
224        .xack(
225            handle.key.as_str(),
226            handle.group.as_str(),
227            handle.id.as_str(),
228        )
229        .await
230        .map_err(broker_err)?;
231    Ok(())
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::context::StreamContext;
238    use fred::clients::Pool;
239    use fred::types::config::Config;
240    use ruststream::BuildContext;
241
242    /// An unconnected pool (just client structs); `Pool::new` opens no sockets.
243    fn offline_pool() -> Pool {
244        Pool::new(Config::default(), None, None, None, 1).expect("offline pool")
245    }
246
247    #[test]
248    fn build_context_reads_entry_id_and_group() {
249        let msg = RedisMessage::new(
250            offline_pool(),
251            "orders".to_owned(),
252            "workers".to_owned(),
253            "1700000000000-0".to_owned(),
254            Bytes::from_static(b"{}"),
255            Headers::new(),
256            PoisonPolicy::default(),
257            None,
258        );
259        let cx = StreamContext::build(&msg);
260        assert_eq!(cx.entry_id(), Some("1700000000000-0"));
261        assert_eq!(cx.consumer_group(), Some("workers"));
262    }
263}