1use std::fmt::{Debug, Formatter};
24use std::sync::Arc;
25use std::time::Duration;
26
27use bytes::Bytes;
28use fred::clients::Pool;
29use fred::error::ErrorKind;
30use fred::interfaces::{KeysInterface, ListInterface};
31use fred::types::lists::LMoveDirection;
32use futures::Stream;
33use futures::stream::unfold;
34use ruststream::codec::Codec;
35use ruststream::runtime::RETRY_COUNT_HEADER;
36use ruststream::{AckError, Headers, IncomingMessage, Partitioned, SubscriptionSource};
37
38use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
39use crate::envelope::{SharedEnvelope, frame, unframe};
40use crate::recovery::{self, RecoveryConfig};
41use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
42
43const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
44const PROCESSING_SUFFIX: &str = ".processing";
46
47fn block_secs(block: Duration) -> f64 {
48 block.as_secs_f64()
49}
50
51fn empty_on_timeout<T>(
55 result: Result<Option<T>, fred::error::Error>,
56) -> Result<Option<T>, RedisError> {
57 match result {
58 Ok(value) => Ok(value),
59 Err(err) if matches!(err.kind(), ErrorKind::Timeout) => Ok(None),
60 Err(err) => Err(RedisError::stream(err)),
61 }
62}
63
64#[derive(Clone)]
77#[must_use]
78pub struct RedisList {
79 key: String,
80 reliable: bool,
81 processing: Option<String>,
82 block: Option<Duration>,
83 codec: Option<SharedEnvelope>,
84 dead_letter: Option<String>,
85 max_deliveries: Option<u64>,
86 min_idle: Option<Duration>,
87 recovery_zset: Option<String>,
88 recovery_ttl: Option<Duration>,
89}
90
91impl Debug for RedisList {
92 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("RedisList")
94 .field("key", &self.key)
95 .field("reliable", &self.reliable)
96 .field("processing", &self.processing)
97 .field("codec", &self.codec.is_some())
98 .field("dead_letter", &self.dead_letter)
99 .field("max_deliveries", &self.max_deliveries)
100 .field("recovery_zset", &self.recovery_zset)
101 .field("recovery_ttl", &self.recovery_ttl)
102 .finish_non_exhaustive()
103 }
104}
105
106impl RedisList {
107 pub fn new(key: impl Into<String>) -> Self {
109 Self {
110 key: key.into(),
111 reliable: false,
112 processing: None,
113 block: None,
114 codec: None,
115 dead_letter: None,
116 max_deliveries: None,
117 min_idle: None,
118 recovery_zset: None,
119 recovery_ttl: None,
120 }
121 }
122
123 pub const fn reliable(mut self) -> Self {
126 self.reliable = true;
127 self
128 }
129
130 pub fn processing(mut self, key: impl Into<String>) -> Self {
132 self.processing = Some(key.into());
133 self
134 }
135
136 pub const fn block(mut self, block: Duration) -> Self {
138 self.block = Some(block);
139 self
140 }
141
142 pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
145 self.codec = Some(Arc::new(codec));
146 self
147 }
148
149 pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
154 self.dead_letter = Some(key.into());
155 self
156 }
157
158 pub const fn max_deliveries(mut self, max: u64) -> Self {
164 self.max_deliveries = Some(max);
165 self
166 }
167
168 pub const fn min_idle(mut self, min_idle: Duration) -> Self {
175 self.min_idle = Some(min_idle);
176 self
177 }
178
179 pub fn recovery_zset(mut self, key: impl Into<String>) -> Self {
185 self.recovery_zset = Some(key.into());
186 self.reliable = true;
187 self
188 }
189
190 pub const fn recovery_ttl(mut self, ttl: Duration) -> Self {
195 self.recovery_ttl = Some(ttl);
196 self
197 }
198
199 #[must_use]
201 pub fn key(&self) -> &str {
202 &self.key
203 }
204
205 pub(crate) const fn is_reliable(&self) -> bool {
206 self.reliable
207 }
208
209 pub(crate) fn processing_or_default(&self) -> String {
210 self.processing
211 .clone()
212 .unwrap_or_else(|| format!("{}{PROCESSING_SUFFIX}", self.key))
213 }
214
215 pub(crate) fn block_or_default(&self) -> Duration {
216 self.block.unwrap_or(DEFAULT_BLOCK)
217 }
218
219 pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
220 self.codec.clone()
221 }
222
223 pub(crate) fn poison_policy(&self) -> PoisonPolicy {
224 PoisonPolicy {
225 dead_letter: self.dead_letter.clone(),
226 max_deliveries: self.max_deliveries,
227 }
228 }
229
230 pub(crate) fn recovery_config(&self) -> Result<Option<RecoveryConfig>, RedisError> {
237 let Some(zset_key) = self.recovery_zset.clone() else {
238 return Ok(None);
239 };
240 let min_idle = self.min_idle.ok_or_else(|| {
241 RedisError::InvalidOptions(format!(
242 "reliable list recovery on `{}` needs a min_idle: call .min_idle(duration) \
243 alongside .recovery_zset(key)",
244 self.key
245 ))
246 })?;
247 Ok(Some(RecoveryConfig {
248 zset_key,
249 min_idle,
250 ttl: self.recovery_ttl,
251 }))
252 }
253}
254
255impl SubscriptionSource<RedisBroker> for RedisList {
256 type Subscriber = RedisListSubscriber;
257
258 fn name(&self) -> &str {
259 self.key()
260 }
261
262 async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
263 broker.subscribe_list(self).await
264 }
265}
266
267pub struct RedisListSubscriber {
269 pool: Pool,
270 key: String,
271 reliable: bool,
272 processing: String,
273 block: Duration,
274 codec: Option<SharedEnvelope>,
275 policy: PoisonPolicy,
276 recovery: Option<RecoveryConfig>,
277}
278
279impl Debug for RedisListSubscriber {
280 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
281 f.debug_struct("RedisListSubscriber")
282 .field("key", &self.key)
283 .field("reliable", &self.reliable)
284 .field("poison", &self.policy.is_active())
285 .field("recovery", &self.recovery.is_some())
286 .finish_non_exhaustive()
287 }
288}
289
290impl RedisListSubscriber {
291 #[allow(
292 clippy::too_many_arguments,
293 reason = "internal constructor mirroring the descriptor"
294 )]
295 pub(crate) fn new(
296 pool: Pool,
297 key: String,
298 reliable: bool,
299 processing: String,
300 block: Duration,
301 codec: Option<SharedEnvelope>,
302 policy: PoisonPolicy,
303 recovery: Option<RecoveryConfig>,
304 ) -> Self {
305 Self {
306 pool,
307 key,
308 reliable,
309 processing,
310 block,
311 codec,
312 policy,
313 recovery,
314 }
315 }
316
317 fn simple_message(&self, raw: &[u8]) -> RedisListMessage {
318 let (payload, headers) = unframe(self.codec.as_ref(), raw);
319 RedisListMessage {
320 payload,
321 headers,
322 ack: None,
323 }
324 }
325
326 fn reliable_message(&self, raw: Vec<u8>, recovery: Option<RecoveryHandle>) -> RedisListMessage {
327 let (payload, headers) = unframe(self.codec.as_ref(), &raw);
328 RedisListMessage {
329 payload,
330 headers,
331 ack: Some(ListAck {
332 pool: self.pool.clone(),
333 main_key: self.key.clone(),
334 processing_key: self.processing.clone(),
335 value: raw,
336 codec: self.codec.clone(),
337 policy: self.policy.clone(),
338 recovery,
339 }),
340 }
341 }
342
343 async fn next_entry(&self) -> Result<Option<RedisListMessage>, RedisError> {
347 let secs = block_secs(self.block);
348 if self.reliable {
349 if let Some(cfg) = &self.recovery {
350 recovery::sweep_orphans(&self.pool, cfg, &self.key, &self.processing).await?;
351 }
352 let value: Option<Vec<u8>> = empty_on_timeout(
353 self.pool
354 .blmove(
355 self.key.as_str(),
356 self.processing.as_str(),
357 LMoveDirection::Right,
358 LMoveDirection::Left,
359 secs,
360 )
361 .await,
362 )?;
363 let Some(value) = value else {
364 return Ok(None);
365 };
366 let handle = match &self.recovery {
367 Some(cfg) => {
368 let member = recovery::record_claim(&self.pool, cfg, &value).await?;
369 Some(RecoveryHandle {
370 zset_key: cfg.zset_key.clone(),
371 member,
372 })
373 }
374 None => None,
375 };
376 Ok(Some(self.reliable_message(value, handle)))
377 } else {
378 let popped: Option<(String, Vec<u8>)> =
379 empty_on_timeout(self.pool.brpop(self.key.as_str(), secs).await)?;
380 Ok(popped.map(|(_, v)| self.simple_message(&v)))
381 }
382 }
383}
384
385impl ruststream::Subscriber for RedisListSubscriber {
386 type Message = RedisListMessage;
387 type Error = RedisError;
388
389 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
396 unfold(&*self, |s| async move {
397 loop {
398 match s.next_entry().await {
399 Ok(Some(msg)) => return Some((Ok(msg), s)),
400 Ok(None) => {}
401 Err(err) => return Some((Err(err), s)),
402 }
403 }
404 })
405 }
406}
407
408struct ListAck {
410 pool: Pool,
411 main_key: String,
412 processing_key: String,
413 value: Vec<u8>,
415 codec: Option<SharedEnvelope>,
417 policy: PoisonPolicy,
418 recovery: Option<RecoveryHandle>,
421}
422
423struct RecoveryHandle {
425 zset_key: String,
426 member: Vec<u8>,
427}
428
429pub struct RedisListMessage {
432 payload: Bytes,
433 headers: Headers,
434 ack: Option<ListAck>,
435}
436
437impl Debug for RedisListMessage {
438 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439 f.debug_struct("RedisListMessage")
440 .field("payload_len", &self.payload.len())
441 .field("reliable", &self.ack.is_some())
442 .finish_non_exhaustive()
443 }
444}
445
446impl IncomingMessage for RedisListMessage {
447 fn payload(&self) -> &[u8] {
448 &self.payload
449 }
450
451 fn headers(&self) -> &Headers {
452 &self.headers
453 }
454
455 async fn ack(self) -> Result<(), AckError> {
456 let Some(handle) = self.ack else {
457 return Err(AckError::Unsupported);
458 };
459 settle(&handle).await
460 }
461
462 async fn nack(self, requeue: bool) -> Result<(), AckError> {
463 let Some(handle) = self.ack else {
464 return Err(AckError::Unsupported);
465 };
466 if requeue {
467 if handle.policy.is_active() {
468 let next = next_retry_count(&self.headers);
469 if handle.policy.is_poison(next) {
470 list_dead_letter(&handle, &self.payload, &self.headers, REASON_MAX_DELIVERIES)
471 .await?;
472 } else {
473 let mut headers = self.headers.clone();
476 headers.insert(RETRY_COUNT_HEADER, next.to_string());
477 let body = frame(handle.codec.as_ref(), &self.payload, &headers);
478 lpush(&handle.pool, handle.main_key.as_str(), body).await?;
479 }
480 } else {
481 lpush(&handle.pool, handle.main_key.as_str(), handle.value.clone()).await?;
483 }
484 } else if handle.policy.is_active() {
485 list_dead_letter(&handle, &self.payload, &self.headers, REASON_DROPPED).await?;
486 }
487 settle(&handle).await
488 }
489}
490
491fn ack_broker(err: fred::error::Error) -> AckError {
492 AckError::Broker(Box::new(err))
493}
494
495fn next_retry_count(headers: &Headers) -> u64 {
497 headers
498 .get_str(RETRY_COUNT_HEADER)
499 .and_then(|v| v.parse::<u64>().ok())
500 .unwrap_or(0)
501 + 1
502}
503
504async fn lpush(pool: &Pool, key: &str, body: Vec<u8>) -> Result<(), AckError> {
505 let _: i64 = pool.lpush(key, body).await.map_err(ack_broker)?;
506 Ok(())
507}
508
509async fn list_dead_letter(
513 handle: &ListAck,
514 payload: &[u8],
515 headers: &Headers,
516 reason: &'static str,
517) -> Result<(), AckError> {
518 if let Some(dlq) = handle.policy.dead_letter_key() {
519 let body = frame(
520 handle.codec.as_ref(),
521 payload,
522 &deadletter::with_reason(headers, reason),
523 );
524 lpush(&handle.pool, dlq, body).await?;
525 }
526 Ok(())
527}
528
529async fn settle(handle: &ListAck) -> Result<(), AckError> {
532 let _: i64 = handle
533 .pool
534 .lrem(handle.processing_key.as_str(), 1, handle.value.clone())
535 .await
536 .map_err(ack_broker)?;
537 if let Some(rec) = &handle.recovery {
538 recovery::forget(&handle.pool, &rec.zset_key, &rec.member).await?;
539 }
540 Ok(())
541}
542
543impl Partitioned for RedisListMessage {
544 fn partition_key(&self) -> Option<&[u8]> {
545 self.headers().get(PARTITION_KEY_HEADER)
546 }
547}
548
549#[derive(Clone)]
555pub struct RedisListPublisher {
556 pool: Arc<tokio::sync::OnceCell<Pool>>,
557 codec: Option<SharedEnvelope>,
558 ttl: Option<Duration>,
559}
560
561impl Debug for RedisListPublisher {
562 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
563 f.debug_struct("RedisListPublisher")
564 .field("codec", &self.codec.is_some())
565 .field("ttl", &self.ttl)
566 .finish_non_exhaustive()
567 }
568}
569
570impl RedisListPublisher {
571 pub(crate) fn new(pool: Arc<tokio::sync::OnceCell<Pool>>) -> Self {
572 Self {
573 pool,
574 codec: None,
575 ttl: None,
576 }
577 }
578
579 #[must_use]
582 pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
583 self.codec = Some(Arc::new(codec));
584 self
585 }
586
587 #[must_use]
609 pub const fn ttl(mut self, ttl: Duration) -> Self {
610 self.ttl = Some(ttl);
611 self
612 }
613}
614
615fn ttl_millis(ttl: Duration) -> i64 {
618 i64::try_from(ttl.as_millis()).unwrap_or(i64::MAX).max(1)
619}
620
621impl ruststream::Publisher for RedisListPublisher {
622 type Error = RedisError;
623
624 async fn publish(&self, msg: ruststream::OutgoingMessage<'_>) -> Result<(), Self::Error> {
625 let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
626 let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
627 let Some(ttl) = self.ttl else {
628 let _: i64 = pool
629 .lpush(msg.name(), body)
630 .await
631 .map_err(RedisError::publish)?;
632 return Ok(());
633 };
634 let pipeline = pool.next().pipeline();
637 let _: () = pipeline
638 .lpush(msg.name(), body)
639 .await
640 .map_err(RedisError::publish)?;
641 let _: () = pipeline
642 .pexpire(msg.name(), ttl_millis(ttl), None)
643 .await
644 .map_err(RedisError::publish)?;
645 let _: Vec<fred::types::Value> = pipeline.all().await.map_err(RedisError::publish)?;
646 Ok(())
647 }
648}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
653
654 #[test]
655 fn ttl_millis_converts_and_clamps() {
656 assert_eq!(ttl_millis(Duration::from_secs(60)), 60_000);
657 assert_eq!(ttl_millis(Duration::from_millis(1)), 1);
658 assert_eq!(ttl_millis(Duration::from_nanos(1)), 1);
660 assert_eq!(ttl_millis(Duration::ZERO), 1);
661 }
662}