Skip to main content

ruststream_fred/
deadletter.rs

1//! Dead-letter routing and a delivery-count poison cap for the transports that can acknowledge
2//! (Redis Streams and the reliable List).
3//!
4//! A message can fail to process indefinitely: a handler keeps `nack`-ing it (the framework
5//! retry-count loop) or it keeps being fetched but never acked across crashes (the native stream
6//! delivery-count loop). Without a cap it is redelivered forever, and a plain
7//! `nack(requeue = false)` silently discards it.
8//!
9//! Two opt-in, off-by-default settings address this on a [`RedisStream`](crate::RedisStream) or a
10//! reliable [`RedisList`](crate::RedisList):
11//!
12//! * `dead_letter(key)` - on drop / poison, copy the message to the named key (same transport
13//!   family: stream to stream, list to list) instead of discarding it, tagged with a
14//!   [`DEAD_LETTER_REASON_HEADER`].
15//! * `max_deliveries(n)` - cap the delivery count; exceeding it dead-letters (or discards) the
16//!   message instead of redelivering.
17//!
18//! The copy is written before the original is acked (`XADD`/`LPUSH` before `XACK`/`LREM`), so a
19//! crash in between leaves a duplicate in the dead-letter store rather than losing the message.
20//! Simple List and Pub/Sub cannot ack, so they have no dead-letter path.
21
22use fred::clients::Pool;
23use fred::interfaces::StreamsInterface;
24use ruststream::Headers;
25
26use crate::convert::fields_for_publish;
27
28/// Header naming why a message was dead-lettered: [`REASON_DROPPED`] or [`REASON_MAX_DELIVERIES`].
29pub const DEAD_LETTER_REASON_HEADER: &str = "x-dead-letter-reason";
30/// Header exposing the native Redis Streams delivery count on a reclaimed delivery, so a handler can
31/// branch or dead-letter manually.
32pub const DELIVERY_COUNT_HEADER: &str = "redis-delivery-count";
33/// Header exposing how long (milliseconds) a reclaimed delivery had been pending.
34pub const IDLE_MS_HEADER: &str = "redis-idle-ms";
35
36/// [`DEAD_LETTER_REASON_HEADER`] value for a `nack(requeue = false)` / drop.
37pub(crate) const REASON_DROPPED: &str = "dropped";
38/// [`DEAD_LETTER_REASON_HEADER`] value for exceeding `max_deliveries`.
39pub(crate) const REASON_MAX_DELIVERIES: &str = "max-deliveries";
40
41/// Resolved dead-letter / poison-cap settings a subscriber and its deliveries carry. Cheap to clone.
42#[derive(Debug, Clone, Default)]
43pub(crate) struct PoisonPolicy {
44    pub(crate) dead_letter: Option<String>,
45    pub(crate) max_deliveries: Option<u64>,
46}
47
48impl PoisonPolicy {
49    /// Whether either setting is configured (otherwise settlement keeps its plain behaviour).
50    pub(crate) const fn is_active(&self) -> bool {
51        self.dead_letter.is_some() || self.max_deliveries.is_some()
52    }
53
54    /// Whether a delivery count has reached the cap (so the message is poison).
55    pub(crate) fn is_poison(&self, count: u64) -> bool {
56        self.max_deliveries.is_some_and(|max| count >= max)
57    }
58
59    pub(crate) fn dead_letter_key(&self) -> Option<&str> {
60        self.dead_letter.as_deref()
61    }
62}
63
64/// Returns the headers for a dead-lettered copy: the originals plus the reason tag.
65pub(crate) fn with_reason(headers: &Headers, reason: &'static str) -> Headers {
66    let mut tagged = headers.clone();
67    tagged.insert(DEAD_LETTER_REASON_HEADER, reason);
68    tagged
69}
70
71/// Routes a message to its dead-letter stream when one is configured, else does nothing (the caller
72/// then `XACK`s, discarding it). `XADD` runs before the caller's `XACK`, so a crash leaves a
73/// duplicate rather than a loss.
74///
75/// # Errors
76///
77/// Returns the underlying `fred` error when the `XADD` fails.
78pub(crate) async fn settle_poison_stream(
79    pool: &Pool,
80    policy: &PoisonPolicy,
81    payload: &[u8],
82    headers: &Headers,
83    reason: &'static str,
84) -> Result<(), fred::error::Error> {
85    if let Some(dlq) = policy.dead_letter_key() {
86        let fields = fields_for_publish(payload, &with_reason(headers, reason));
87        let _: String = pool.xadd(dlq, false, None::<()>, "*", fields).await?;
88    }
89    Ok(())
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn is_poison_only_when_count_reaches_the_cap() {
98        let policy = PoisonPolicy {
99            dead_letter: None,
100            max_deliveries: Some(3),
101        };
102        assert!(!policy.is_poison(2));
103        assert!(policy.is_poison(3));
104        assert!(policy.is_poison(4));
105        assert!(policy.is_active());
106    }
107
108    #[test]
109    fn no_cap_is_never_poison() {
110        let policy = PoisonPolicy::default();
111        assert!(!policy.is_poison(u64::MAX));
112        assert!(!policy.is_active());
113    }
114
115    #[test]
116    fn with_reason_tags_without_dropping_originals() {
117        let mut headers = Headers::new();
118        headers.insert("content-type", "application/json");
119        let tagged = with_reason(&headers, REASON_DROPPED);
120        assert_eq!(tagged.get_str(DEAD_LETTER_REASON_HEADER), Some("dropped"));
121        assert_eq!(tagged.content_type(), Some("application/json"));
122    }
123}