Skip to main content

ruststream_fred/
message.rs

1//! Delivered-message wrapper that implements [`IncomingMessage`].
2
3use std::fmt::{Debug, Formatter};
4use std::time::Duration;
5
6use bytes::Bytes;
7use fred::clients::Pool;
8use fred::interfaces::StreamsInterface;
9use ruststream::runtime::RETRY_COUNT_HEADER;
10use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
11
12use crate::convert::fields_for_publish;
13use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
14use crate::delay::{self, DelayConfig};
15
16/// The well-known header key for per-message routing / partitioning.
17///
18/// Set this header on outgoing messages to control key-based fan-out when the runtime is
19/// configured with `workers(N, by_key)`. The value is opaque bytes; the runtime hashes it to
20/// assign a dispatch lane. Redis has no native partition concept on a single stream, so the key
21/// travels as this header value and the sender is responsible for setting it.
22pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
23
24/// Everything a [`RedisMessage`] needs to settle itself against the stream it came from.
25struct AckHandle {
26    pool: Pool,
27    key: String,
28    group: String,
29    id: String,
30}
31
32/// A Redis Streams delivery, read from a consumer group via `XREADGROUP` or `XAUTOCLAIM`.
33///
34/// Settlement follows the republish-retry model: `ack` is `XACK`; `nack(requeue = true)`
35/// re-appends a copy of the entry to the same stream and then acks the original (at-least-once,
36/// so a duplicate is possible if the process crashes between the two); `nack(requeue = false)`
37/// acks the original to drop it.
38pub struct RedisMessage {
39    payload: Bytes,
40    headers: Headers,
41    ack: Option<AckHandle>,
42    policy: PoisonPolicy,
43    /// Set when the subscription opted into a durable ZSET delay queue; makes `nack_after` native.
44    delay: Option<DelayConfig>,
45}
46
47impl Debug for RedisMessage {
48    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49        let mut s = f.debug_struct("RedisMessage");
50        s.field("payload_len", &self.payload.len());
51        if let Some(ack) = &self.ack {
52            s.field("key", &ack.key).field("id", &ack.id);
53        }
54        s.finish_non_exhaustive()
55    }
56}
57
58impl RedisMessage {
59    #[allow(
60        clippy::too_many_arguments,
61        reason = "internal constructor mirroring the descriptor"
62    )]
63    pub(crate) fn new(
64        pool: Pool,
65        key: String,
66        group: String,
67        id: String,
68        payload: Bytes,
69        headers: Headers,
70        policy: PoisonPolicy,
71        delay: Option<DelayConfig>,
72    ) -> Self {
73        Self {
74            payload,
75            headers,
76            ack: Some(AckHandle {
77                pool,
78                key,
79                group,
80                id,
81            }),
82            policy,
83            delay,
84        }
85    }
86
87    /// The stream entry ID (for example `1700000000000-0`) this message was read at.
88    #[must_use]
89    pub fn id(&self) -> Option<&str> {
90        self.ack.as_ref().map(|a| a.id.as_str())
91    }
92}
93
94impl Partitioned for RedisMessage {
95    fn partition_key(&self) -> Option<&[u8]> {
96        self.headers().get(PARTITION_KEY_HEADER)
97    }
98}
99
100impl IncomingMessage for RedisMessage {
101    fn payload(&self) -> &[u8] {
102        &self.payload
103    }
104
105    fn headers(&self) -> &Headers {
106        &self.headers
107    }
108
109    async fn ack(mut self) -> Result<(), AckError> {
110        let handle = self.ack.take().expect("RedisMessage settled twice");
111        xack(&handle).await
112    }
113
114    async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
115        let handle = self.ack.take().expect("RedisMessage settled twice");
116        if requeue {
117            if self.policy.is_active() {
118                let next = next_retry_count(&self.headers);
119                if self.policy.is_poison(next) {
120                    // The framework retry-count reached the cap: dead-letter (or discard) instead
121                    // of redelivering, then ack the original.
122                    deadletter::settle_poison_stream(
123                        &handle.pool,
124                        &self.policy,
125                        &self.payload,
126                        &self.headers,
127                        REASON_MAX_DELIVERIES,
128                    )
129                    .await
130                    .map_err(broker_err)?;
131                } else {
132                    let mut headers = self.headers.clone();
133                    headers.insert(RETRY_COUNT_HEADER, next.to_string());
134                    republish(&handle, &self.payload, &headers).await?;
135                }
136            } else {
137                // No poison policy: republish verbatim, the plain at-least-once retry.
138                republish(&handle, &self.payload, &self.headers).await?;
139            }
140        } else if self.policy.is_active() {
141            // Drop: dead-letter it (or discard when no dead-letter stream is set) before acking.
142            deadletter::settle_poison_stream(
143                &handle.pool,
144                &self.policy,
145                &self.payload,
146                &self.headers,
147                REASON_DROPPED,
148            )
149            .await
150            .map_err(broker_err)?;
151        }
152        xack(&handle).await
153    }
154
155    /// Native delayed redelivery is available only when the subscription opted into a durable ZSET
156    /// delay queue with [`RedisStream::delayed_retry`](crate::RedisStream::delayed_retry); otherwise
157    /// the runtime applies its broker-agnostic deferred-republish fallback.
158    fn supports_nack_after(&self) -> bool {
159        self.delay.is_some()
160    }
161
162    /// Schedules the message for redelivery no sooner than `delay` from now via the configured ZSET
163    /// delay queue (`ZADD` the delayed copy, then `XACK` the original), with the retry-count header
164    /// incremented. The subscriber's sweeper re-`XADD`s it to the source stream once due.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`AckError::Unsupported`] when the subscription did not opt into a delay queue, or
169    /// [`AckError::Broker`] when the `ZADD` or `XACK` fails.
170    async fn nack_after(mut self, delay: Duration) -> Result<(), AckError> {
171        let handle = self.ack.take().expect("RedisMessage settled twice");
172        let Some(cfg) = self.delay.as_ref() else {
173            return Err(AckError::Unsupported);
174        };
175        // ZADD the delayed copy before XACK-ing the original, so a crash in between leaves a
176        // duplicate (the scheduled copy plus the still-pending original) rather than a loss.
177        delay::schedule(
178            &handle.pool,
179            cfg,
180            &handle.id,
181            &self.payload,
182            &self.headers,
183            delay,
184        )
185        .await?;
186        xack(&handle).await
187    }
188}
189
190/// The next framework retry-count value (the current header plus one, or one when absent).
191fn next_retry_count(headers: &Headers) -> u64 {
192    headers
193        .get_str(RETRY_COUNT_HEADER)
194        .and_then(|v| v.parse::<u64>().ok())
195        .unwrap_or(0)
196        + 1
197}
198
199fn broker_err(err: fred::error::Error) -> AckError {
200    AckError::Broker(Box::new(err))
201}
202
203/// Re-appends a copy of the message to the tail of its stream (the at-least-once retry). Runs before
204/// the caller's `XACK` so a crash leaves a duplicate rather than a loss.
205async fn republish(handle: &AckHandle, payload: &[u8], headers: &Headers) -> Result<(), AckError> {
206    let fields = fields_for_publish(payload, headers);
207    let _: String = handle
208        .pool
209        .xadd(handle.key.as_str(), false, None::<()>, "*", fields)
210        .await
211        .map_err(broker_err)?;
212    Ok(())
213}
214
215async fn xack(handle: &AckHandle) -> Result<(), AckError> {
216    let _: i64 = handle
217        .pool
218        .xack(
219            handle.key.as_str(),
220            handle.group.as_str(),
221            handle.id.as_str(),
222        )
223        .await
224        .map_err(broker_err)?;
225    Ok(())
226}