ruststream_fred/deadletter.rs
1//! Dead-letter routing and a delivery-count poison cap for the transports that can acknowledge
2//! (Redis Streams and the reliable List).
3//!
4//! A message can fail to process indefinitely: a handler keeps `nack`-ing it (the framework
5//! retry-count loop) or it keeps being fetched but never acked across crashes (the native stream
6//! delivery-count loop). Without a cap it is redelivered forever, and a plain
7//! `nack(requeue = false)` silently discards it.
8//!
9//! Two opt-in, off-by-default settings address this on a [`RedisStream`](crate::RedisStream) or a
10//! reliable [`RedisList`](crate::RedisList):
11//!
12//! * `dead_letter(key)` - on drop / poison, copy the message to the named key (same transport
13//! family: stream to stream, list to list) instead of discarding it, tagged with a
14//! [`DEAD_LETTER_REASON_HEADER`].
15//! * `max_deliveries(n)` - cap the delivery count; exceeding it dead-letters (or discards) the
16//! message instead of redelivering.
17//!
18//! The copy is written before the original is acked (`XADD`/`LPUSH` before `XACK`/`LREM`), so a
19//! crash in between leaves a duplicate in the dead-letter store rather than losing the message.
20//! Simple List and Pub/Sub cannot ack, so they have no dead-letter path.
21
22use fred::clients::Pool;
23use fred::interfaces::StreamsInterface;
24use ruststream::Headers;
25
26use crate::convert::fields_for_publish;
27
28/// Header naming why a message was dead-lettered: [`REASON_DROPPED`] or [`REASON_MAX_DELIVERIES`].
29pub const DEAD_LETTER_REASON_HEADER: &str = "x-dead-letter-reason";
30/// Header exposing the native Redis Streams delivery count on a reclaimed delivery, so a handler can
31/// branch or dead-letter manually.
32pub const DELIVERY_COUNT_HEADER: &str = "redis-delivery-count";
33/// Header exposing how long (milliseconds) a reclaimed delivery had been pending.
34pub const IDLE_MS_HEADER: &str = "redis-idle-ms";
35
36/// [`DEAD_LETTER_REASON_HEADER`] value for a `nack(requeue = false)` / drop.
37pub(crate) const REASON_DROPPED: &str = "dropped";
38/// [`DEAD_LETTER_REASON_HEADER`] value for exceeding `max_deliveries`.
39pub(crate) const REASON_MAX_DELIVERIES: &str = "max-deliveries";
40
41/// Resolved dead-letter / poison-cap settings a subscriber and its deliveries carry. Cheap to clone.
42#[derive(Debug, Clone, Default)]
43pub(crate) struct PoisonPolicy {
44 pub(crate) dead_letter: Option<String>,
45 pub(crate) max_deliveries: Option<u64>,
46}
47
48impl PoisonPolicy {
49 /// Whether either setting is configured (otherwise settlement keeps its plain behaviour).
50 pub(crate) const fn is_active(&self) -> bool {
51 self.dead_letter.is_some() || self.max_deliveries.is_some()
52 }
53
54 /// Whether a delivery count has reached the cap (so the message is poison).
55 pub(crate) fn is_poison(&self, count: u64) -> bool {
56 self.max_deliveries.is_some_and(|max| count >= max)
57 }
58
59 pub(crate) fn dead_letter_key(&self) -> Option<&str> {
60 self.dead_letter.as_deref()
61 }
62}
63
64/// Returns the headers for a dead-lettered copy: the originals plus the reason tag.
65pub(crate) fn with_reason(headers: &Headers, reason: &'static str) -> Headers {
66 let mut tagged = headers.clone();
67 tagged.insert(DEAD_LETTER_REASON_HEADER, reason);
68 tagged
69}
70
71/// Routes a message to its dead-letter stream when one is configured, else does nothing (the caller
72/// then `XACK`s, discarding it). `XADD` runs before the caller's `XACK`, so a crash leaves a
73/// duplicate rather than a loss.
74///
75/// # Errors
76///
77/// Returns the underlying `fred` error when the `XADD` fails.
78pub(crate) async fn settle_poison_stream(
79 pool: &Pool,
80 policy: &PoisonPolicy,
81 payload: &[u8],
82 headers: &Headers,
83 reason: &'static str,
84) -> Result<(), fred::error::Error> {
85 if let Some(dlq) = policy.dead_letter_key() {
86 let fields = fields_for_publish(payload, &with_reason(headers, reason));
87 let _: String = pool.xadd(dlq, false, None::<()>, "*", fields).await?;
88 }
89 Ok(())
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[test]
97 fn is_poison_only_when_count_reaches_the_cap() {
98 let policy = PoisonPolicy {
99 dead_letter: None,
100 max_deliveries: Some(3),
101 };
102 assert!(!policy.is_poison(2));
103 assert!(policy.is_poison(3));
104 assert!(policy.is_poison(4));
105 assert!(policy.is_active());
106 }
107
108 #[test]
109 fn no_cap_is_never_poison() {
110 let policy = PoisonPolicy::default();
111 assert!(!policy.is_poison(u64::MAX));
112 assert!(!policy.is_active());
113 }
114
115 #[test]
116 fn with_reason_tags_without_dropping_originals() {
117 let mut headers = Headers::new();
118 headers.insert("content-type", "application/json");
119 let tagged = with_reason(&headers, REASON_DROPPED);
120 assert_eq!(tagged.get_str(DEAD_LETTER_REASON_HEADER), Some("dropped"));
121 assert_eq!(tagged.content_type(), Some("application/json"));
122 }
123}