Skip to main content

ruststream_fred/
message.rs

1//! Delivered-message wrapper that implements [`IncomingMessage`].
2
3use std::fmt::{Debug, Formatter};
4
5use bytes::Bytes;
6use fred::clients::Pool;
7use fred::interfaces::StreamsInterface;
8use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
9
10use crate::convert::fields_for_publish;
11
12/// The well-known header key for per-message routing / partitioning.
13///
14/// Set this header on outgoing messages to control key-based fan-out when the runtime is
15/// configured with `workers(N, by_key)`. The value is opaque bytes; the runtime hashes it to
16/// assign a dispatch lane. Redis has no native partition concept on a single stream, so the key
17/// travels as this header value and the sender is responsible for setting it.
18pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
19
20/// Everything a [`RedisMessage`] needs to settle itself against the stream it came from.
21struct AckHandle {
22    pool: Pool,
23    key: String,
24    group: String,
25    id: String,
26}
27
28/// A Redis Streams delivery, read from a consumer group via `XREADGROUP` or `XAUTOCLAIM`.
29///
30/// Settlement follows the republish-retry model: `ack` is `XACK`; `nack(requeue = true)`
31/// re-appends a copy of the entry to the same stream and then acks the original (at-least-once,
32/// so a duplicate is possible if the process crashes between the two); `nack(requeue = false)`
33/// acks the original to drop it.
34pub struct RedisMessage {
35    payload: Bytes,
36    headers: Headers,
37    ack: Option<AckHandle>,
38}
39
40impl Debug for RedisMessage {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        let mut s = f.debug_struct("RedisMessage");
43        s.field("payload_len", &self.payload.len());
44        if let Some(ack) = &self.ack {
45            s.field("key", &ack.key).field("id", &ack.id);
46        }
47        s.finish_non_exhaustive()
48    }
49}
50
51impl RedisMessage {
52    pub(crate) fn new(
53        pool: Pool,
54        key: String,
55        group: String,
56        id: String,
57        payload: Bytes,
58        headers: Headers,
59    ) -> Self {
60        Self {
61            payload,
62            headers,
63            ack: Some(AckHandle {
64                pool,
65                key,
66                group,
67                id,
68            }),
69        }
70    }
71
72    /// The stream entry ID (for example `1700000000000-0`) this message was read at.
73    #[must_use]
74    pub fn id(&self) -> Option<&str> {
75        self.ack.as_ref().map(|a| a.id.as_str())
76    }
77}
78
79impl Partitioned for RedisMessage {
80    fn partition_key(&self) -> Option<&[u8]> {
81        self.headers().get(PARTITION_KEY_HEADER)
82    }
83}
84
85impl IncomingMessage for RedisMessage {
86    fn payload(&self) -> &[u8] {
87        &self.payload
88    }
89
90    fn headers(&self) -> &Headers {
91        &self.headers
92    }
93
94    async fn ack(mut self) -> Result<(), AckError> {
95        let handle = self.ack.take().expect("RedisMessage settled twice");
96        xack(&handle).await
97    }
98
99    async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
100        let handle = self.ack.take().expect("RedisMessage settled twice");
101        if requeue {
102            // Republish a copy to the tail before acking the original, so a crash in between
103            // leaves a duplicate rather than losing the message (at-least-once).
104            let fields = fields_for_publish(&self.payload, &self.headers);
105            let _: String = handle
106                .pool
107                .xadd(handle.key.as_str(), false, None::<()>, "*", fields)
108                .await
109                .map_err(|err| AckError::Broker(Box::new(err)))?;
110        }
111        xack(&handle).await
112    }
113}
114
115async fn xack(handle: &AckHandle) -> Result<(), AckError> {
116    let _: i64 = handle
117        .pool
118        .xack(
119            handle.key.as_str(),
120            handle.group.as_str(),
121            handle.id.as_str(),
122        )
123        .await
124        .map_err(|err| AckError::Broker(Box::new(err)))?;
125    Ok(())
126}