ruststream_fred/
message.rs1use std::fmt::{Debug, Formatter};
4use std::time::Duration;
5
6use bytes::Bytes;
7use fred::clients::Pool;
8use fred::interfaces::StreamsInterface;
9use ruststream::runtime::RETRY_COUNT_HEADER;
10use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
11
12use crate::convert::fields_for_publish;
13use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
14use crate::delay::{self, DelayConfig};
15
16pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
23
24struct AckHandle {
26 pool: Pool,
27 key: String,
28 group: String,
29 id: String,
30}
31
32pub struct RedisMessage {
39 payload: Bytes,
40 headers: Headers,
41 ack: Option<AckHandle>,
42 policy: PoisonPolicy,
43 delay: Option<DelayConfig>,
45}
46
47impl Debug for RedisMessage {
48 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49 let mut s = f.debug_struct("RedisMessage");
50 s.field("payload_len", &self.payload.len());
51 if let Some(ack) = &self.ack {
52 s.field("key", &ack.key).field("id", &ack.id);
53 }
54 s.finish_non_exhaustive()
55 }
56}
57
58impl RedisMessage {
59 #[allow(
60 clippy::too_many_arguments,
61 reason = "internal constructor mirroring the descriptor"
62 )]
63 pub(crate) fn new(
64 pool: Pool,
65 key: String,
66 group: String,
67 id: String,
68 payload: Bytes,
69 headers: Headers,
70 policy: PoisonPolicy,
71 delay: Option<DelayConfig>,
72 ) -> Self {
73 Self {
74 payload,
75 headers,
76 ack: Some(AckHandle {
77 pool,
78 key,
79 group,
80 id,
81 }),
82 policy,
83 delay,
84 }
85 }
86
87 #[must_use]
89 pub fn id(&self) -> Option<&str> {
90 self.ack.as_ref().map(|a| a.id.as_str())
91 }
92}
93
94impl Partitioned for RedisMessage {
95 fn partition_key(&self) -> Option<&[u8]> {
96 self.headers().get(PARTITION_KEY_HEADER)
97 }
98}
99
100impl IncomingMessage for RedisMessage {
101 fn payload(&self) -> &[u8] {
102 &self.payload
103 }
104
105 fn headers(&self) -> &Headers {
106 &self.headers
107 }
108
109 async fn ack(mut self) -> Result<(), AckError> {
110 let handle = self.ack.take().expect("RedisMessage settled twice");
111 xack(&handle).await
112 }
113
114 async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
115 let handle = self.ack.take().expect("RedisMessage settled twice");
116 if requeue {
117 if self.policy.is_active() {
118 let next = next_retry_count(&self.headers);
119 if self.policy.is_poison(next) {
120 deadletter::settle_poison_stream(
123 &handle.pool,
124 &self.policy,
125 &self.payload,
126 &self.headers,
127 REASON_MAX_DELIVERIES,
128 )
129 .await
130 .map_err(broker_err)?;
131 } else {
132 let mut headers = self.headers.clone();
133 headers.insert(RETRY_COUNT_HEADER, next.to_string());
134 republish(&handle, &self.payload, &headers).await?;
135 }
136 } else {
137 republish(&handle, &self.payload, &self.headers).await?;
139 }
140 } else if self.policy.is_active() {
141 deadletter::settle_poison_stream(
143 &handle.pool,
144 &self.policy,
145 &self.payload,
146 &self.headers,
147 REASON_DROPPED,
148 )
149 .await
150 .map_err(broker_err)?;
151 }
152 xack(&handle).await
153 }
154
155 fn supports_nack_after(&self) -> bool {
159 self.delay.is_some()
160 }
161
162 async fn nack_after(mut self, delay: Duration) -> Result<(), AckError> {
171 let handle = self.ack.take().expect("RedisMessage settled twice");
172 let Some(cfg) = self.delay.as_ref() else {
173 return Err(AckError::Unsupported);
174 };
175 delay::schedule(
178 &handle.pool,
179 cfg,
180 &handle.id,
181 &self.payload,
182 &self.headers,
183 delay,
184 )
185 .await?;
186 xack(&handle).await
187 }
188}
189
190fn next_retry_count(headers: &Headers) -> u64 {
192 headers
193 .get_str(RETRY_COUNT_HEADER)
194 .and_then(|v| v.parse::<u64>().ok())
195 .unwrap_or(0)
196 + 1
197}
198
199fn broker_err(err: fred::error::Error) -> AckError {
200 AckError::Broker(Box::new(err))
201}
202
203async fn republish(handle: &AckHandle, payload: &[u8], headers: &Headers) -> Result<(), AckError> {
206 let fields = fields_for_publish(payload, headers);
207 let _: String = handle
208 .pool
209 .xadd(handle.key.as_str(), false, None::<()>, "*", fields)
210 .await
211 .map_err(broker_err)?;
212 Ok(())
213}
214
215async fn xack(handle: &AckHandle) -> Result<(), AckError> {
216 let _: i64 = handle
217 .pool
218 .xack(
219 handle.key.as_str(),
220 handle.group.as_str(),
221 handle.id.as_str(),
222 )
223 .await
224 .map_err(broker_err)?;
225 Ok(())
226}