Skip to main content

ruststream_fred/
delay.rs

1//! Durable delayed retry backed by a Redis sorted-set (ZSET) delay queue.
2//!
3//! `retry_after(delay)` (a handler returning [`HandlerResult::retry_after`], or a delivery
4//! `nack_after`-ed) asks the broker to redeliver a message no sooner than `delay` from now. Redis
5//! Streams have no native per-message delay, so without this the runtime falls back to its
6//! broker-agnostic deferred re-publish, which is at-most-once over the delay window (a process
7//! crash before the timer fires loses the deferred copy).
8//!
9//! This module adds a crash-safe alternative, **opt-in and OFF by default**: when a subscription
10//! names a ZSET key with [`RedisStream::delayed_retry`](crate::RedisStream::delayed_retry),
11//! `nack_after` becomes native (`supports_nack_after` reports `true`). A delayed message is `ZADD`ed
12//! to the named ZSET with score `fire_at = now + delay`, then the original is `XACK`ed; a sweeper
13//! folded into the subscriber's read loop moves due entries (`score <= now`) back onto the source
14//! stream with `XADD`. The entry lives in Redis across a crash, so redelivery survives a restart.
15//!
16//! Scores are wall-clock epoch milliseconds taken on the publishing process, so deployments should
17//! keep clocks reasonably synced (NTP) - the same assumption any wall-clock delay queue makes.
18//!
19//! [`HandlerResult::retry_after`]: ruststream::runtime::HandlerResult::retry_after
20
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use bytes::Bytes;
24use fred::clients::Pool;
25use fred::interfaces::{KeysInterface, SortedSetsInterface, StreamsInterface};
26use ruststream::runtime::RETRY_COUNT_HEADER;
27use ruststream::{AckError, Headers};
28
29use crate::convert::fields_for_publish;
30use crate::envelope::{frame, unframe};
31use crate::error::RedisError;
32
33/// How many due entries one sweep pass claims and re-publishes before yielding back to the read
34/// loop. Bounds the work a single fetch does so a large backlog cannot stall fresh delivery.
35const SWEEP_BATCH: i64 = 128;
36
37/// How a subscription should handle `retry_after` / `nack_after` delays.
38///
39/// Passed to [`RedisStream::delayed_retry`](crate::RedisStream::delayed_retry). There is no default
40/// that enables it: a delay queue costs extra Redis keys, memory, and the polling sweeper, so a user
41/// opts in and names the ZSET key explicitly (the key has no sane default).
42///
43/// # Examples
44///
45/// ```
46/// use std::time::Duration;
47/// use ruststream_fred::{DelayedRetry, RedisStream};
48///
49/// let sub = RedisStream::new("orders").group("workers").delayed_retry(
50///     DelayedRetry::DurableZset { key: "orders.delayed".to_owned(), ttl: None },
51/// );
52/// # let _ = sub;
53/// ```
54#[derive(Debug, Clone)]
55#[non_exhaustive]
56pub enum DelayedRetry {
57    /// Schedule delayed redeliveries in the named ZSET.
58    ///
59    /// `key` is the ZSET delay-queue key (required, no default). `ttl`, when set, `PEXPIRE`s the
60    /// key on every write so an abandoned queue cleans itself up; it **must exceed the longest
61    /// scheduled `retry_after` delay**, or pending entries are dropped before they fire.
62    DurableZset {
63        /// The ZSET delay-queue key.
64        key: String,
65        /// Optional auto-cleanup TTL on the ZSET key. Must exceed the longest scheduled delay.
66        ttl: Option<Duration>,
67    },
68}
69
70/// The resolved delay-queue settings a subscriber and its messages carry. Cheap to clone.
71#[derive(Debug, Clone)]
72pub(crate) struct DelayConfig {
73    zset_key: String,
74    ttl: Option<Duration>,
75}
76
77impl DelayConfig {
78    pub(crate) fn from_retry(retry: &DelayedRetry) -> Self {
79        match retry {
80            DelayedRetry::DurableZset { key, ttl } => Self {
81                zset_key: key.clone(),
82                ttl: *ttl,
83            },
84        }
85    }
86}
87
88/// Current wall-clock time as epoch milliseconds (the ZSET score space).
89fn now_ms() -> u64 {
90    SystemTime::now()
91        .duration_since(UNIX_EPOCH)
92        .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
93}
94
95fn delay_millis(delay: Duration) -> u64 {
96    u64::try_from(delay.as_millis()).unwrap_or(u64::MAX)
97}
98
99/// Epoch-millisecond timestamps stay well under 2^53, so representing one as the f64 score a ZSET
100/// uses is lossless in practice.
101#[allow(
102    clippy::cast_precision_loss,
103    reason = "epoch-ms < 2^53 is exact in f64"
104)]
105fn as_score(ms: u64) -> f64 {
106    ms as f64
107}
108
109/// Positive millisecond count for `PEXPIRE`, clamped up to 1 (a `PEXPIRE 0` deletes the key).
110fn ttl_millis(ttl: Duration) -> i64 {
111    i64::try_from(ttl.as_millis()).unwrap_or(i64::MAX).max(1)
112}
113
114/// Packs an entry for a ZSET member: a length-prefixed delivery id (a uniqueness salt, so two
115/// byte-identical payloads do not collide into one member) followed by the lossless header/payload
116/// frame. The id is not reused on redelivery; the sweep re-`XADD`s under a fresh id.
117fn encode_member(id: &str, payload: &[u8], headers: &Headers) -> Vec<u8> {
118    let body = frame(None, payload, headers);
119    let id = id.as_bytes();
120    let id_len = u32::try_from(id.len()).unwrap_or(u32::MAX);
121    let mut buf = Vec::with_capacity(4 + id.len() + body.len());
122    buf.extend_from_slice(&id_len.to_be_bytes());
123    buf.extend_from_slice(id);
124    buf.extend_from_slice(&body);
125    buf
126}
127
128/// Reverses [`encode_member`], dropping the id salt and returning the payload and headers.
129fn decode_member(member: &[u8]) -> Option<(Bytes, Headers)> {
130    let id_len = u32::from_be_bytes(member.get(0..4)?.try_into().ok()?) as usize;
131    let body = member.get(4usize.checked_add(id_len)?..)?;
132    Some(unframe(None, body))
133}
134
135fn next_retry_count(headers: &Headers) -> u64 {
136    headers
137        .get_str(RETRY_COUNT_HEADER)
138        .and_then(|v| v.parse::<u64>().ok())
139        .unwrap_or(0)
140        + 1
141}
142
143fn broker_err(err: fred::error::Error) -> AckError {
144    AckError::Broker(Box::new(err))
145}
146
147/// `ZADD`s a delayed copy of the message (retry count incremented) at `now + delay`, refreshing the
148/// optional key TTL. The caller `XACK`s the original afterwards, so a crash in between leaves the
149/// scheduled copy (a duplicate) rather than losing the message.
150pub(crate) async fn schedule(
151    pool: &Pool,
152    cfg: &DelayConfig,
153    id: &str,
154    payload: &[u8],
155    headers: &Headers,
156    delay: Duration,
157) -> Result<(), AckError> {
158    let fire_at = now_ms().saturating_add(delay_millis(delay));
159
160    let mut next = headers.clone();
161    next.insert(RETRY_COUNT_HEADER, next_retry_count(headers).to_string());
162    let member = encode_member(id, payload, &next);
163
164    let _: i64 = pool
165        .zadd(
166            cfg.zset_key.as_str(),
167            None,
168            None,
169            false,
170            false,
171            (as_score(fire_at), member),
172        )
173        .await
174        .map_err(broker_err)?;
175    if let Some(ttl) = cfg.ttl {
176        let _: i64 = pool
177            .pexpire(cfg.zset_key.as_str(), ttl_millis(ttl), None)
178            .await
179            .map_err(broker_err)?;
180    }
181    Ok(())
182}
183
184/// Moves entries whose `fire_at` has passed from the delay ZSET back onto `stream_key`.
185///
186/// Each due member is claimed with `ZREM`: only the consumer whose `ZREM` removes it (returns 1)
187/// re-`XADD`s it, so concurrent sweepers never double-publish. Bounded to [`SWEEP_BATCH`] entries
188/// per pass.
189pub(crate) async fn sweep_due(
190    pool: &Pool,
191    cfg: &DelayConfig,
192    stream_key: &str,
193) -> Result<(), RedisError> {
194    let now = as_score(now_ms());
195    let due: Vec<Bytes> = pool
196        .zrangebyscore(
197            cfg.zset_key.as_str(),
198            0.0,
199            now,
200            false,
201            Some((0, SWEEP_BATCH)),
202        )
203        .await
204        .map_err(RedisError::stream)?;
205
206    for member in due {
207        let removed: i64 = pool
208            .zrem(cfg.zset_key.as_str(), member.clone())
209            .await
210            .map_err(RedisError::stream)?;
211        // Another sweeper already claimed and re-published this entry.
212        if removed != 1 {
213            continue;
214        }
215        let Some((payload, headers)) = decode_member(&member) else {
216            continue;
217        };
218        let fields = fields_for_publish(&payload, &headers);
219        let _: String = pool
220            .xadd(stream_key, false, None::<()>, "*", fields)
221            .await
222            .map_err(RedisError::stream)?;
223    }
224    Ok(())
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn member_round_trips_payload_and_headers() {
233        let mut headers = Headers::new();
234        headers.insert("content-type", "application/json");
235        headers.insert(RETRY_COUNT_HEADER, "2");
236
237        let member = encode_member("1700000000000-0", b"{}", &headers);
238        let (payload, decoded) = decode_member(&member).expect("decodes");
239        assert_eq!(payload.as_ref(), b"{}");
240        assert_eq!(decoded.content_type(), Some("application/json"));
241        assert_eq!(decoded.get_str(RETRY_COUNT_HEADER), Some("2"));
242    }
243
244    #[test]
245    fn distinct_ids_yield_distinct_members_for_equal_payloads() {
246        let headers = Headers::new();
247        let a = encode_member("1-0", b"dup", &headers);
248        let b = encode_member("2-0", b"dup", &headers);
249        assert_ne!(
250            a, b,
251            "the id salt must keep equal payloads from colliding in the ZSET"
252        );
253    }
254
255    #[test]
256    fn next_retry_count_starts_at_one_and_increments() {
257        let mut headers = Headers::new();
258        assert_eq!(next_retry_count(&headers), 1);
259        headers.insert(RETRY_COUNT_HEADER, "4");
260        assert_eq!(next_retry_count(&headers), 5);
261        // A malformed counter restarts from zero rather than panicking.
262        headers.insert(RETRY_COUNT_HEADER, "not-a-number");
263        assert_eq!(next_retry_count(&headers), 1);
264    }
265
266    #[test]
267    fn ttl_millis_clamps_sub_millisecond_to_one() {
268        assert_eq!(ttl_millis(Duration::from_secs(30)), 30_000);
269        assert_eq!(ttl_millis(Duration::ZERO), 1);
270    }
271}