1use std::fmt::{Debug, Formatter};
22use std::sync::Arc;
23use std::time::Duration;
24
25use bytes::Bytes;
26use fred::clients::Pool;
27use fred::interfaces::ListInterface;
28use fred::types::lists::LMoveDirection;
29use futures::Stream;
30use futures::stream::unfold;
31use ruststream::codec::Codec;
32use ruststream::{AckError, Headers, IncomingMessage, Partitioned, SubscriptionSource};
33
34use crate::envelope::{SharedEnvelope, frame, unframe};
35use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
36
37const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
38const PROCESSING_SUFFIX: &str = ".processing";
40
41fn block_secs(block: Duration) -> f64 {
42 block.as_secs_f64()
43}
44
45#[derive(Clone)]
58#[must_use]
59pub struct RedisList {
60 key: String,
61 reliable: bool,
62 processing: Option<String>,
63 block: Option<Duration>,
64 codec: Option<SharedEnvelope>,
65}
66
67impl Debug for RedisList {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("RedisList")
70 .field("key", &self.key)
71 .field("reliable", &self.reliable)
72 .field("processing", &self.processing)
73 .field("codec", &self.codec.is_some())
74 .finish_non_exhaustive()
75 }
76}
77
78impl RedisList {
79 pub fn new(key: impl Into<String>) -> Self {
81 Self {
82 key: key.into(),
83 reliable: false,
84 processing: None,
85 block: None,
86 codec: None,
87 }
88 }
89
90 pub const fn reliable(mut self) -> Self {
93 self.reliable = true;
94 self
95 }
96
97 pub fn processing(mut self, key: impl Into<String>) -> Self {
99 self.processing = Some(key.into());
100 self
101 }
102
103 pub const fn block(mut self, block: Duration) -> Self {
105 self.block = Some(block);
106 self
107 }
108
109 pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
112 self.codec = Some(Arc::new(codec));
113 self
114 }
115
116 #[must_use]
118 pub fn key(&self) -> &str {
119 &self.key
120 }
121
122 pub(crate) const fn is_reliable(&self) -> bool {
123 self.reliable
124 }
125
126 pub(crate) fn processing_or_default(&self) -> String {
127 self.processing
128 .clone()
129 .unwrap_or_else(|| format!("{}{PROCESSING_SUFFIX}", self.key))
130 }
131
132 pub(crate) fn block_or_default(&self) -> Duration {
133 self.block.unwrap_or(DEFAULT_BLOCK)
134 }
135
136 pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
137 self.codec.clone()
138 }
139}
140
141impl SubscriptionSource<RedisBroker> for RedisList {
142 type Subscriber = RedisListSubscriber;
143
144 fn name(&self) -> &str {
145 self.key()
146 }
147
148 async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
149 broker.subscribe_list(self).await
150 }
151}
152
153pub struct RedisListSubscriber {
155 pool: Pool,
156 key: String,
157 reliable: bool,
158 processing: String,
159 block: Duration,
160 codec: Option<SharedEnvelope>,
161}
162
163impl Debug for RedisListSubscriber {
164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct("RedisListSubscriber")
166 .field("key", &self.key)
167 .field("reliable", &self.reliable)
168 .finish_non_exhaustive()
169 }
170}
171
172impl RedisListSubscriber {
173 pub(crate) fn new(
174 pool: Pool,
175 key: String,
176 reliable: bool,
177 processing: String,
178 block: Duration,
179 codec: Option<SharedEnvelope>,
180 ) -> Self {
181 Self {
182 pool,
183 key,
184 reliable,
185 processing,
186 block,
187 codec,
188 }
189 }
190
191 fn simple_message(&self, raw: &[u8]) -> RedisListMessage {
192 let (payload, headers) = unframe(self.codec.as_ref(), raw);
193 RedisListMessage {
194 payload,
195 headers,
196 ack: None,
197 }
198 }
199
200 fn reliable_message(&self, raw: Vec<u8>) -> RedisListMessage {
201 let (payload, headers) = unframe(self.codec.as_ref(), &raw);
202 RedisListMessage {
203 payload,
204 headers,
205 ack: Some(ListAck {
206 pool: self.pool.clone(),
207 main_key: self.key.clone(),
208 processing_key: self.processing.clone(),
209 value: raw,
210 }),
211 }
212 }
213
214 async fn next_entry(&self) -> Result<Option<RedisListMessage>, RedisError> {
216 let secs = block_secs(self.block);
217 if self.reliable {
218 let value: Option<Vec<u8>> = self
219 .pool
220 .blmove(
221 self.key.as_str(),
222 self.processing.as_str(),
223 LMoveDirection::Right,
224 LMoveDirection::Left,
225 secs,
226 )
227 .await
228 .map_err(RedisError::stream)?;
229 Ok(value.map(|v| self.reliable_message(v)))
230 } else {
231 let popped: Option<(String, Vec<u8>)> = self
232 .pool
233 .brpop(self.key.as_str(), secs)
234 .await
235 .map_err(RedisError::stream)?;
236 Ok(popped.map(|(_, v)| self.simple_message(&v)))
237 }
238 }
239}
240
241impl ruststream::Subscriber for RedisListSubscriber {
242 type Message = RedisListMessage;
243 type Error = RedisError;
244
245 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
252 unfold(&*self, |s| async move {
253 loop {
254 match s.next_entry().await {
255 Ok(Some(msg)) => return Some((Ok(msg), s)),
256 Ok(None) => {}
257 Err(err) => return Some((Err(err), s)),
258 }
259 }
260 })
261 }
262}
263
264struct ListAck {
266 pool: Pool,
267 main_key: String,
268 processing_key: String,
269 value: Vec<u8>,
271}
272
273pub struct RedisListMessage {
276 payload: Bytes,
277 headers: Headers,
278 ack: Option<ListAck>,
279}
280
281impl Debug for RedisListMessage {
282 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
283 f.debug_struct("RedisListMessage")
284 .field("payload_len", &self.payload.len())
285 .field("reliable", &self.ack.is_some())
286 .finish_non_exhaustive()
287 }
288}
289
290impl IncomingMessage for RedisListMessage {
291 fn payload(&self) -> &[u8] {
292 &self.payload
293 }
294
295 fn headers(&self) -> &Headers {
296 &self.headers
297 }
298
299 async fn ack(self) -> Result<(), AckError> {
300 let Some(handle) = self.ack else {
301 return Err(AckError::Unsupported);
302 };
303 lrem(&handle).await
304 }
305
306 async fn nack(self, requeue: bool) -> Result<(), AckError> {
307 let Some(handle) = self.ack else {
308 return Err(AckError::Unsupported);
309 };
310 if requeue {
311 let _: i64 = handle
314 .pool
315 .lpush(handle.main_key.as_str(), handle.value.clone())
316 .await
317 .map_err(|err| AckError::Broker(Box::new(err)))?;
318 }
319 lrem(&handle).await
320 }
321}
322
323async fn lrem(handle: &ListAck) -> Result<(), AckError> {
324 let _: i64 = handle
325 .pool
326 .lrem(handle.processing_key.as_str(), 1, handle.value.clone())
327 .await
328 .map_err(|err| AckError::Broker(Box::new(err)))?;
329 Ok(())
330}
331
332impl Partitioned for RedisListMessage {
333 fn partition_key(&self) -> Option<&[u8]> {
334 self.headers().get(PARTITION_KEY_HEADER)
335 }
336}
337
338#[derive(Clone)]
344pub struct RedisListPublisher {
345 pool: Arc<tokio::sync::OnceCell<Pool>>,
346 codec: Option<SharedEnvelope>,
347}
348
349impl Debug for RedisListPublisher {
350 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
351 f.debug_struct("RedisListPublisher")
352 .field("codec", &self.codec.is_some())
353 .finish_non_exhaustive()
354 }
355}
356
357impl RedisListPublisher {
358 pub(crate) fn new(pool: Arc<tokio::sync::OnceCell<Pool>>) -> Self {
359 Self { pool, codec: None }
360 }
361
362 #[must_use]
365 pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
366 self.codec = Some(Arc::new(codec));
367 self
368 }
369}
370
371impl ruststream::Publisher for RedisListPublisher {
372 type Error = RedisError;
373
374 async fn publish(&self, msg: ruststream::OutgoingMessage<'_>) -> Result<(), Self::Error> {
375 let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
376 let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
377 let _: i64 = pool
378 .lpush(msg.name(), body)
379 .await
380 .map_err(RedisError::publish)?;
381 Ok(())
382 }
383}