1use std::fmt::{Debug, Formatter};
4use std::time::Duration;
5
6use bytes::Bytes;
7use fred::clients::Pool;
8use fred::interfaces::StreamsInterface;
9use ruststream::runtime::RETRY_COUNT_HEADER;
10use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
11
12use crate::convert::fields_for_publish;
13use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
14use crate::delay::{self, DelayConfig};
15
16pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
23
24struct AckHandle {
26 pool: Pool,
27 key: String,
28 group: String,
29 id: String,
30}
31
32pub struct RedisMessage {
39 payload: Bytes,
40 headers: Headers,
41 ack: Option<AckHandle>,
42 policy: PoisonPolicy,
43 delay: Option<DelayConfig>,
45}
46
47impl Debug for RedisMessage {
48 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49 let mut s = f.debug_struct("RedisMessage");
50 s.field("payload_len", &self.payload.len());
51 if let Some(ack) = &self.ack {
52 s.field("key", &ack.key).field("id", &ack.id);
53 }
54 s.finish_non_exhaustive()
55 }
56}
57
58impl RedisMessage {
59 #[allow(
60 clippy::too_many_arguments,
61 reason = "internal constructor mirroring the descriptor"
62 )]
63 pub(crate) fn new(
64 pool: Pool,
65 key: String,
66 group: String,
67 id: String,
68 payload: Bytes,
69 headers: Headers,
70 policy: PoisonPolicy,
71 delay: Option<DelayConfig>,
72 ) -> Self {
73 Self {
74 payload,
75 headers,
76 ack: Some(AckHandle {
77 pool,
78 key,
79 group,
80 id,
81 }),
82 policy,
83 delay,
84 }
85 }
86
87 #[must_use]
89 pub fn id(&self) -> Option<&str> {
90 self.ack.as_ref().map(|a| a.id.as_str())
91 }
92
93 #[must_use]
95 pub fn group(&self) -> Option<&str> {
96 self.ack.as_ref().map(|a| a.group.as_str())
97 }
98}
99
100impl Partitioned for RedisMessage {
101 fn partition_key(&self) -> Option<&[u8]> {
102 self.headers().get(PARTITION_KEY_HEADER)
103 }
104}
105
106impl IncomingMessage for RedisMessage {
107 fn payload(&self) -> &[u8] {
108 &self.payload
109 }
110
111 fn headers(&self) -> &Headers {
112 &self.headers
113 }
114
115 async fn ack(mut self) -> Result<(), AckError> {
116 let handle = self.ack.take().expect("RedisMessage settled twice");
117 xack(&handle).await
118 }
119
120 async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
121 let handle = self.ack.take().expect("RedisMessage settled twice");
122 if requeue {
123 if self.policy.is_active() {
124 let next = next_retry_count(&self.headers);
125 if self.policy.is_poison(next) {
126 deadletter::settle_poison_stream(
129 &handle.pool,
130 &self.policy,
131 &self.payload,
132 &self.headers,
133 REASON_MAX_DELIVERIES,
134 )
135 .await
136 .map_err(broker_err)?;
137 } else {
138 let mut headers = self.headers.clone();
139 headers.insert(RETRY_COUNT_HEADER, next.to_string());
140 republish(&handle, &self.payload, &headers).await?;
141 }
142 } else {
143 republish(&handle, &self.payload, &self.headers).await?;
145 }
146 } else if self.policy.is_active() {
147 deadletter::settle_poison_stream(
149 &handle.pool,
150 &self.policy,
151 &self.payload,
152 &self.headers,
153 REASON_DROPPED,
154 )
155 .await
156 .map_err(broker_err)?;
157 }
158 xack(&handle).await
159 }
160
161 fn supports_nack_after(&self) -> bool {
165 self.delay.is_some()
166 }
167
168 async fn nack_after(mut self, delay: Duration) -> Result<(), AckError> {
177 let handle = self.ack.take().expect("RedisMessage settled twice");
178 let Some(cfg) = self.delay.as_ref() else {
179 return Err(AckError::Unsupported);
180 };
181 delay::schedule(
184 &handle.pool,
185 cfg,
186 &handle.id,
187 &self.payload,
188 &self.headers,
189 delay,
190 )
191 .await?;
192 xack(&handle).await
193 }
194}
195
196fn next_retry_count(headers: &Headers) -> u64 {
198 headers
199 .get_str(RETRY_COUNT_HEADER)
200 .and_then(|v| v.parse::<u64>().ok())
201 .unwrap_or(0)
202 + 1
203}
204
205fn broker_err(err: fred::error::Error) -> AckError {
206 AckError::Broker(Box::new(err))
207}
208
209async fn republish(handle: &AckHandle, payload: &[u8], headers: &Headers) -> Result<(), AckError> {
212 let fields = fields_for_publish(payload, headers);
213 let _: String = handle
214 .pool
215 .xadd(handle.key.as_str(), false, None::<()>, "*", fields)
216 .await
217 .map_err(broker_err)?;
218 Ok(())
219}
220
221async fn xack(handle: &AckHandle) -> Result<(), AckError> {
222 let _: i64 = handle
223 .pool
224 .xack(
225 handle.key.as_str(),
226 handle.group.as_str(),
227 handle.id.as_str(),
228 )
229 .await
230 .map_err(broker_err)?;
231 Ok(())
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::context::StreamContext;
238 use fred::clients::Pool;
239 use fred::types::config::Config;
240 use ruststream::BuildContext;
241
242 fn offline_pool() -> Pool {
244 Pool::new(Config::default(), None, None, None, 1).expect("offline pool")
245 }
246
247 #[test]
248 fn build_context_reads_entry_id_and_group() {
249 let msg = RedisMessage::new(
250 offline_pool(),
251 "orders".to_owned(),
252 "workers".to_owned(),
253 "1700000000000-0".to_owned(),
254 Bytes::from_static(b"{}"),
255 Headers::new(),
256 PoisonPolicy::default(),
257 None,
258 );
259 let cx = StreamContext::build(&msg);
260 assert_eq!(cx.entry_id(), Some("1700000000000-0"));
261 assert_eq!(cx.consumer_group(), Some("workers"));
262 }
263}