1use std::fmt::{Debug, Formatter};
24use std::sync::Arc;
25use std::time::Duration;
26
27use bytes::Bytes;
28use fred::clients::Pool;
29use fred::error::ErrorKind;
30use fred::interfaces::{KeysInterface, ListInterface};
31use fred::types::lists::LMoveDirection;
32use futures::Stream;
33use futures::stream::unfold;
34use ruststream::codec::Codec;
35use ruststream::runtime::RETRY_COUNT_HEADER;
36use ruststream::{AckError, Headers, IncomingMessage, Partitioned, SubscriptionSource};
37
38use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
39use crate::envelope::{SharedEnvelope, frame, unframe};
40use crate::recovery::{self, RecoveryConfig};
41use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
42
43const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
44const PROCESSING_SUFFIX: &str = ".processing";
46
47fn block_secs(block: Duration) -> f64 {
48 block.as_secs_f64()
49}
50
51fn empty_on_timeout<T>(
55 result: Result<Option<T>, fred::error::Error>,
56) -> Result<Option<T>, RedisError> {
57 match result {
58 Ok(value) => Ok(value),
59 Err(err) if matches!(err.kind(), ErrorKind::Timeout) => Ok(None),
60 Err(err) => Err(RedisError::stream(err)),
61 }
62}
63
64#[derive(Clone)]
77#[must_use]
78pub struct RedisList {
79 key: String,
80 reliable: bool,
81 processing: Option<String>,
82 block: Option<Duration>,
83 codec: Option<SharedEnvelope>,
84 dead_letter: Option<String>,
85 max_deliveries: Option<u64>,
86 min_idle: Option<Duration>,
87 recovery_zset: Option<String>,
88 recovery_ttl: Option<Duration>,
89}
90
91impl Debug for RedisList {
92 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("RedisList")
94 .field("key", &self.key)
95 .field("reliable", &self.reliable)
96 .field("processing", &self.processing)
97 .field("codec", &self.codec.is_some())
98 .field("dead_letter", &self.dead_letter)
99 .field("max_deliveries", &self.max_deliveries)
100 .field("recovery_zset", &self.recovery_zset)
101 .field("recovery_ttl", &self.recovery_ttl)
102 .finish_non_exhaustive()
103 }
104}
105
106impl RedisList {
107 pub fn new(key: impl Into<String>) -> Self {
109 Self {
110 key: key.into(),
111 reliable: false,
112 processing: None,
113 block: None,
114 codec: None,
115 dead_letter: None,
116 max_deliveries: None,
117 min_idle: None,
118 recovery_zset: None,
119 recovery_ttl: None,
120 }
121 }
122
123 pub const fn reliable(mut self) -> Self {
126 self.reliable = true;
127 self
128 }
129
130 pub fn processing(mut self, key: impl Into<String>) -> Self {
132 self.processing = Some(key.into());
133 self
134 }
135
136 pub const fn block(mut self, block: Duration) -> Self {
138 self.block = Some(block);
139 self
140 }
141
142 pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
145 self.codec = Some(Arc::new(codec));
146 self
147 }
148
149 pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
154 self.dead_letter = Some(key.into());
155 self
156 }
157
158 pub const fn max_deliveries(mut self, max: u64) -> Self {
164 self.max_deliveries = Some(max);
165 self
166 }
167
168 pub const fn min_idle(mut self, min_idle: Duration) -> Self {
175 self.min_idle = Some(min_idle);
176 self
177 }
178
179 pub fn recovery_zset(mut self, key: impl Into<String>) -> Self {
185 self.recovery_zset = Some(key.into());
186 self.reliable = true;
187 self
188 }
189
190 pub const fn recovery_ttl(mut self, ttl: Duration) -> Self {
195 self.recovery_ttl = Some(ttl);
196 self
197 }
198
199 #[must_use]
201 pub fn key(&self) -> &str {
202 &self.key
203 }
204
205 pub(crate) const fn is_reliable(&self) -> bool {
206 self.reliable
207 }
208
209 pub(crate) fn processing_or_default(&self) -> String {
210 self.processing
211 .clone()
212 .unwrap_or_else(|| format!("{}{PROCESSING_SUFFIX}", self.key))
213 }
214
215 pub(crate) fn block_or_default(&self) -> Duration {
216 self.block.unwrap_or(DEFAULT_BLOCK)
217 }
218
219 pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
220 self.codec.clone()
221 }
222
223 pub(crate) fn poison_policy(&self) -> PoisonPolicy {
224 PoisonPolicy {
225 dead_letter: self.dead_letter.clone(),
226 max_deliveries: self.max_deliveries,
227 }
228 }
229
230 pub(crate) fn recovery_config(&self) -> Result<Option<RecoveryConfig>, RedisError> {
237 let Some(zset_key) = self.recovery_zset.clone() else {
238 return Ok(None);
239 };
240 let min_idle = self.min_idle.ok_or_else(|| {
241 RedisError::InvalidOptions(format!(
242 "reliable list recovery on `{}` needs a min_idle: call .min_idle(duration) \
243 alongside .recovery_zset(key)",
244 self.key
245 ))
246 })?;
247 Ok(Some(RecoveryConfig {
248 zset_key,
249 min_idle,
250 ttl: self.recovery_ttl,
251 }))
252 }
253}
254
255impl SubscriptionSource<RedisBroker> for RedisList {
256 type Subscriber = RedisListSubscriber;
257
258 fn name(&self) -> &str {
259 self.key()
260 }
261
262 async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
263 broker.subscribe_list(self).await
264 }
265}
266
267#[cfg(feature = "testing")]
268impl SubscriptionSource<crate::testing::RedisTestBroker> for RedisList {
269 type Subscriber = crate::testing::RedisTestSubscriber;
270
271 fn name(&self) -> &str {
272 self.key()
273 }
274
275 async fn subscribe(
276 self,
277 broker: &crate::testing::RedisTestBroker,
278 ) -> Result<Self::Subscriber, RedisError> {
279 broker.subscribe(self.key()).await
280 }
281}
282
283pub struct RedisListSubscriber {
285 pool: Pool,
286 key: String,
287 reliable: bool,
288 processing: String,
289 block: Duration,
290 codec: Option<SharedEnvelope>,
291 policy: PoisonPolicy,
292 recovery: Option<RecoveryConfig>,
293}
294
295impl Debug for RedisListSubscriber {
296 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
297 f.debug_struct("RedisListSubscriber")
298 .field("key", &self.key)
299 .field("reliable", &self.reliable)
300 .field("poison", &self.policy.is_active())
301 .field("recovery", &self.recovery.is_some())
302 .finish_non_exhaustive()
303 }
304}
305
306impl RedisListSubscriber {
307 #[allow(
308 clippy::too_many_arguments,
309 reason = "internal constructor mirroring the descriptor"
310 )]
311 pub(crate) fn new(
312 pool: Pool,
313 key: String,
314 reliable: bool,
315 processing: String,
316 block: Duration,
317 codec: Option<SharedEnvelope>,
318 policy: PoisonPolicy,
319 recovery: Option<RecoveryConfig>,
320 ) -> Self {
321 Self {
322 pool,
323 key,
324 reliable,
325 processing,
326 block,
327 codec,
328 policy,
329 recovery,
330 }
331 }
332
333 fn simple_message(&self, raw: &[u8]) -> RedisListMessage {
334 let (payload, headers) = unframe(self.codec.as_ref(), raw);
335 RedisListMessage {
336 payload,
337 headers,
338 ack: None,
339 }
340 }
341
342 fn reliable_message(&self, raw: Vec<u8>, recovery: Option<RecoveryHandle>) -> RedisListMessage {
343 let (payload, headers) = unframe(self.codec.as_ref(), &raw);
344 RedisListMessage {
345 payload,
346 headers,
347 ack: Some(ListAck {
348 pool: self.pool.clone(),
349 main_key: self.key.clone(),
350 processing_key: self.processing.clone(),
351 value: raw,
352 codec: self.codec.clone(),
353 policy: self.policy.clone(),
354 recovery,
355 }),
356 }
357 }
358
359 async fn next_entry(&self) -> Result<Option<RedisListMessage>, RedisError> {
363 let secs = block_secs(self.block);
364 if self.reliable {
365 if let Some(cfg) = &self.recovery {
366 recovery::sweep_orphans(&self.pool, cfg, &self.key, &self.processing).await?;
367 }
368 let value: Option<Vec<u8>> = empty_on_timeout(
369 self.pool
370 .blmove(
371 self.key.as_str(),
372 self.processing.as_str(),
373 LMoveDirection::Right,
374 LMoveDirection::Left,
375 secs,
376 )
377 .await,
378 )?;
379 let Some(value) = value else {
380 return Ok(None);
381 };
382 let handle = match &self.recovery {
383 Some(cfg) => {
384 let member = recovery::record_claim(&self.pool, cfg, &value).await?;
385 Some(RecoveryHandle {
386 zset_key: cfg.zset_key.clone(),
387 member,
388 })
389 }
390 None => None,
391 };
392 Ok(Some(self.reliable_message(value, handle)))
393 } else {
394 let popped: Option<(String, Vec<u8>)> =
395 empty_on_timeout(self.pool.brpop(self.key.as_str(), secs).await)?;
396 Ok(popped.map(|(_, v)| self.simple_message(&v)))
397 }
398 }
399}
400
401impl ruststream::Subscriber for RedisListSubscriber {
402 type Message = RedisListMessage;
403 type Error = RedisError;
404
405 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
412 unfold(&*self, |s| async move {
413 loop {
414 match s.next_entry().await {
415 Ok(Some(msg)) => return Some((Ok(msg), s)),
416 Ok(None) => {}
417 Err(err) => return Some((Err(err), s)),
418 }
419 }
420 })
421 }
422}
423
424struct ListAck {
426 pool: Pool,
427 main_key: String,
428 processing_key: String,
429 value: Vec<u8>,
431 codec: Option<SharedEnvelope>,
433 policy: PoisonPolicy,
434 recovery: Option<RecoveryHandle>,
437}
438
439struct RecoveryHandle {
441 zset_key: String,
442 member: Vec<u8>,
443}
444
445pub struct RedisListMessage {
448 payload: Bytes,
449 headers: Headers,
450 ack: Option<ListAck>,
451}
452
453impl Debug for RedisListMessage {
454 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
455 f.debug_struct("RedisListMessage")
456 .field("payload_len", &self.payload.len())
457 .field("reliable", &self.ack.is_some())
458 .finish_non_exhaustive()
459 }
460}
461
462impl IncomingMessage for RedisListMessage {
463 fn payload(&self) -> &[u8] {
464 &self.payload
465 }
466
467 fn headers(&self) -> &Headers {
468 &self.headers
469 }
470
471 async fn ack(self) -> Result<(), AckError> {
472 let Some(handle) = self.ack else {
473 return Err(AckError::Unsupported);
474 };
475 settle(&handle).await
476 }
477
478 async fn nack(self, requeue: bool) -> Result<(), AckError> {
479 let Some(handle) = self.ack else {
480 return Err(AckError::Unsupported);
481 };
482 if requeue {
483 if handle.policy.is_active() {
484 let next = next_retry_count(&self.headers);
485 if handle.policy.is_poison(next) {
486 list_dead_letter(&handle, &self.payload, &self.headers, REASON_MAX_DELIVERIES)
487 .await?;
488 } else {
489 let mut headers = self.headers.clone();
492 headers.insert(RETRY_COUNT_HEADER, next.to_string());
493 let body = frame(handle.codec.as_ref(), &self.payload, &headers);
494 lpush(&handle.pool, handle.main_key.as_str(), body).await?;
495 }
496 } else {
497 lpush(&handle.pool, handle.main_key.as_str(), handle.value.clone()).await?;
499 }
500 } else if handle.policy.is_active() {
501 list_dead_letter(&handle, &self.payload, &self.headers, REASON_DROPPED).await?;
502 }
503 settle(&handle).await
504 }
505}
506
507fn ack_broker(err: fred::error::Error) -> AckError {
508 AckError::Broker(Box::new(err))
509}
510
511fn next_retry_count(headers: &Headers) -> u64 {
513 headers
514 .get_str(RETRY_COUNT_HEADER)
515 .and_then(|v| v.parse::<u64>().ok())
516 .unwrap_or(0)
517 + 1
518}
519
520async fn lpush(pool: &Pool, key: &str, body: Vec<u8>) -> Result<(), AckError> {
521 let _: i64 = pool.lpush(key, body).await.map_err(ack_broker)?;
522 Ok(())
523}
524
525async fn list_dead_letter(
529 handle: &ListAck,
530 payload: &[u8],
531 headers: &Headers,
532 reason: &'static str,
533) -> Result<(), AckError> {
534 if let Some(dlq) = handle.policy.dead_letter_key() {
535 let body = frame(
536 handle.codec.as_ref(),
537 payload,
538 &deadletter::with_reason(headers, reason),
539 );
540 lpush(&handle.pool, dlq, body).await?;
541 }
542 Ok(())
543}
544
545async fn settle(handle: &ListAck) -> Result<(), AckError> {
548 let _: i64 = handle
549 .pool
550 .lrem(handle.processing_key.as_str(), 1, handle.value.clone())
551 .await
552 .map_err(ack_broker)?;
553 if let Some(rec) = &handle.recovery {
554 recovery::forget(&handle.pool, &rec.zset_key, &rec.member).await?;
555 }
556 Ok(())
557}
558
559impl Partitioned for RedisListMessage {
560 fn partition_key(&self) -> Option<&[u8]> {
561 self.headers().get(PARTITION_KEY_HEADER)
562 }
563}
564
565#[derive(Clone)]
571pub struct RedisListPublisher {
572 pool: Arc<tokio::sync::OnceCell<Pool>>,
573 codec: Option<SharedEnvelope>,
574 ttl: Option<Duration>,
575}
576
577impl Debug for RedisListPublisher {
578 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579 f.debug_struct("RedisListPublisher")
580 .field("codec", &self.codec.is_some())
581 .field("ttl", &self.ttl)
582 .finish_non_exhaustive()
583 }
584}
585
586impl RedisListPublisher {
587 pub(crate) fn new(pool: Arc<tokio::sync::OnceCell<Pool>>) -> Self {
588 Self {
589 pool,
590 codec: None,
591 ttl: None,
592 }
593 }
594
595 #[must_use]
598 pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
599 self.codec = Some(Arc::new(codec));
600 self
601 }
602
603 #[must_use]
625 pub const fn ttl(mut self, ttl: Duration) -> Self {
626 self.ttl = Some(ttl);
627 self
628 }
629}
630
631fn ttl_millis(ttl: Duration) -> i64 {
634 i64::try_from(ttl.as_millis()).unwrap_or(i64::MAX).max(1)
635}
636
637impl ruststream::Publisher for RedisListPublisher {
638 type Error = RedisError;
639
640 async fn publish(&self, msg: ruststream::OutgoingMessage<'_>) -> Result<(), Self::Error> {
641 let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
642 let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
643 let Some(ttl) = self.ttl else {
644 let _: i64 = pool
645 .lpush(msg.name(), body)
646 .await
647 .map_err(RedisError::publish)?;
648 return Ok(());
649 };
650 let pipeline = pool.next().pipeline();
653 let _: () = pipeline
654 .lpush(msg.name(), body)
655 .await
656 .map_err(RedisError::publish)?;
657 let _: () = pipeline
658 .pexpire(msg.name(), ttl_millis(ttl), None)
659 .await
660 .map_err(RedisError::publish)?;
661 let _: Vec<fred::types::Value> = pipeline.all().await.map_err(RedisError::publish)?;
662 Ok(())
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669
670 #[test]
671 fn ttl_millis_converts_and_clamps() {
672 assert_eq!(ttl_millis(Duration::from_secs(60)), 60_000);
673 assert_eq!(ttl_millis(Duration::from_millis(1)), 1);
674 assert_eq!(ttl_millis(Duration::from_nanos(1)), 1);
676 assert_eq!(ttl_millis(Duration::ZERO), 1);
677 }
678}