Skip to main content

ruststream_fred/
list.rs

1//! Redis list transport: a competing-consumers work queue.
2//!
3//! A producer `LPUSH`es onto the list; consumers pop from the right (`BRPOP`), so delivery is FIFO
4//! and each entry goes to exactly one consumer (no fan-out, no replay, no groups). Two modes:
5//!
6//! * Simple (default) - `BRPOP`, at-most-once. `ack` / `nack` report [`AckError::Unsupported`]: once
7//!   popped, the entry is gone, so a crash mid-handler loses it.
8//! * Reliable ([`RedisList::reliable`]) - `LMOVE` the entry to a per-consumer processing list, then
9//!   `LREM` it on `ack` (at-least-once). `nack(requeue = true)` returns it to the main list;
10//!   `nack(requeue = false)` removes it.
11//!
12//! Reliable mode has no native idle/pending tracking, so a consumer that dies after `LMOVE` but
13//! before settling leaves its entry stranded on the processing list. Opting into a recovery ZSET
14//! with [`RedisList::recovery_zset`] (and [`RedisList::min_idle`]) starts a watchdog that returns
15//! such orphans to the main list; without it (the default) reliable lists have no orphan recovery,
16//! and Redis Streams ([`crate::RedisStream`]) remain the recommended durable path. See
17//! [`crate::recovery`].
18//!
19//! Headers travel in a frame around the payload (see [`crate::envelope`]): a lossless binary frame
20//! by default, or a readable codec-serialized envelope when a codec is set with
21//! [`RedisList::codec`] / [`RedisListPublisher::codec`].
22
23use std::fmt::{Debug, Formatter};
24use std::sync::Arc;
25use std::time::Duration;
26
27use bytes::Bytes;
28use fred::clients::Pool;
29use fred::error::ErrorKind;
30use fred::interfaces::{KeysInterface, ListInterface};
31use fred::types::lists::LMoveDirection;
32use futures::Stream;
33use futures::stream::unfold;
34use ruststream::codec::Codec;
35use ruststream::runtime::RETRY_COUNT_HEADER;
36use ruststream::{AckError, Headers, IncomingMessage, Partitioned, SubscriptionSource};
37
38use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
39use crate::envelope::{SharedEnvelope, frame, unframe};
40use crate::recovery::{self, RecoveryConfig};
41use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
42
43const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
44/// Suffix appended to the list key to form the default per-consumer processing list (reliable mode).
45const PROCESSING_SUFFIX: &str = ".processing";
46
47fn block_secs(block: Duration) -> f64 {
48    block.as_secs_f64()
49}
50
51/// Normalizes a blocking pop (`BRPOP` / `BLMOVE`) result: fred reports a timed-out pop with nothing
52/// available as a timeout error rather than an empty reply, so treat that as "no entry this round"
53/// and let the read loop retry. Any other error propagates.
54fn empty_on_timeout<T>(
55    result: Result<Option<T>, fred::error::Error>,
56) -> Result<Option<T>, RedisError> {
57    match result {
58        Ok(value) => Ok(value),
59        Err(err) if matches!(err.kind(), ErrorKind::Timeout) => Ok(None),
60        Err(err) => Err(RedisError::stream(err)),
61    }
62}
63
64/// Describes one list subscription against [`crate::RedisBroker`].
65///
66/// # Examples
67///
68/// ```
69/// use std::time::Duration;
70/// use ruststream_fred::RedisList;
71///
72/// let simple = RedisList::new("jobs");
73/// let reliable = RedisList::new("jobs").reliable().block(Duration::from_secs(2));
74/// # let _ = (simple, reliable);
75/// ```
76#[derive(Clone)]
77#[must_use]
78pub struct RedisList {
79    key: String,
80    reliable: bool,
81    processing: Option<String>,
82    block: Option<Duration>,
83    codec: Option<SharedEnvelope>,
84    dead_letter: Option<String>,
85    max_deliveries: Option<u64>,
86    min_idle: Option<Duration>,
87    recovery_zset: Option<String>,
88    recovery_ttl: Option<Duration>,
89}
90
91impl Debug for RedisList {
92    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("RedisList")
94            .field("key", &self.key)
95            .field("reliable", &self.reliable)
96            .field("processing", &self.processing)
97            .field("codec", &self.codec.is_some())
98            .field("dead_letter", &self.dead_letter)
99            .field("max_deliveries", &self.max_deliveries)
100            .field("recovery_zset", &self.recovery_zset)
101            .field("recovery_ttl", &self.recovery_ttl)
102            .finish_non_exhaustive()
103    }
104}
105
106impl RedisList {
107    /// A simple (at-most-once) `BRPOP` work-queue consumer on `key`.
108    pub fn new(key: impl Into<String>) -> Self {
109        Self {
110            key: key.into(),
111            reliable: false,
112            processing: None,
113            block: None,
114            codec: None,
115            dead_letter: None,
116            max_deliveries: None,
117            min_idle: None,
118            recovery_zset: None,
119            recovery_ttl: None,
120        }
121    }
122
123    /// Switches to reliable (at-least-once) mode: entries move to a processing list and are removed
124    /// on `ack`.
125    pub const fn reliable(mut self) -> Self {
126        self.reliable = true;
127        self
128    }
129
130    /// Sets the processing-list key used in reliable mode. Defaults to `<key>.processing`.
131    pub fn processing(mut self, key: impl Into<String>) -> Self {
132        self.processing = Some(key.into());
133        self
134    }
135
136    /// How long one blocking pop waits before looping. Defaults to 5 seconds.
137    pub const fn block(mut self, block: Duration) -> Self {
138        self.block = Some(block);
139        self
140    }
141
142    /// Decodes the header/payload envelope with `codec` (must match the publisher). Without it the
143    /// default lossless binary framing is used.
144    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
145        self.codec = Some(Arc::new(codec));
146        self
147    }
148
149    /// In reliable mode, routes dropped and poison entries to the named dead-letter list (`LPUSH`)
150    /// instead of discarding them, tagged with
151    /// [`DEAD_LETTER_REASON_HEADER`](crate::DEAD_LETTER_REASON_HEADER). Off by default. Has no effect
152    /// on a simple list, which cannot ack. See [`crate::deadletter`].
153    pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
154        self.dead_letter = Some(key.into());
155        self
156    }
157
158    /// In reliable mode, caps how many times an entry may be `nack(requeue = true)`-ed before it is
159    /// treated as poison (dead-lettered or, with no dead-letter list, discarded). Off by default.
160    ///
161    /// Lists have no native delivery counter, so this tracks the framework retry-count header carried
162    /// in the entry's envelope.
163    pub const fn max_deliveries(mut self, max: u64) -> Self {
164        self.max_deliveries = Some(max);
165        self
166    }
167
168    /// How long a claimed reliable-mode entry may sit idle on the processing list before the
169    /// recovery watchdog returns it to the main list. Required for (and only meaningful with)
170    /// [`recovery_zset`](Self::recovery_zset).
171    ///
172    /// It has no default and must exceed the longest legitimate handler runtime: set it too low and
173    /// a healthy consumer's in-flight entry gets recovered and processed twice.
174    pub const fn min_idle(mut self, min_idle: Duration) -> Self {
175        self.min_idle = Some(min_idle);
176        self
177    }
178
179    /// Opts reliable mode into orphan recovery, naming the ZSET key that tracks in-flight claims.
180    ///
181    /// Off by default (a dead consumer's entry stays stranded on the processing list). The key has
182    /// no sane default, so it is named explicitly here; pair it with [`min_idle`](Self::min_idle),
183    /// which is required when recovery is on. Reliable mode is implied. See [`crate::recovery`].
184    pub fn recovery_zset(mut self, key: impl Into<String>) -> Self {
185        self.recovery_zset = Some(key.into());
186        self.reliable = true;
187        self
188    }
189
190    /// An optional auto-cleanup TTL on the recovery ZSET key (refreshed on every claim).
191    ///
192    /// When set it must exceed [`min_idle`](Self::min_idle) (and the longest legitimate handler
193    /// runtime), or in-flight tracking is dropped before the watchdog can act.
194    pub const fn recovery_ttl(mut self, ttl: Duration) -> Self {
195        self.recovery_ttl = Some(ttl);
196        self
197    }
198
199    /// The list key this subscription consumes.
200    #[must_use]
201    pub fn key(&self) -> &str {
202        &self.key
203    }
204
205    pub(crate) const fn is_reliable(&self) -> bool {
206        self.reliable
207    }
208
209    pub(crate) fn processing_or_default(&self) -> String {
210        self.processing
211            .clone()
212            .unwrap_or_else(|| format!("{}{PROCESSING_SUFFIX}", self.key))
213    }
214
215    pub(crate) fn block_or_default(&self) -> Duration {
216        self.block.unwrap_or(DEFAULT_BLOCK)
217    }
218
219    pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
220        self.codec.clone()
221    }
222
223    pub(crate) fn poison_policy(&self) -> PoisonPolicy {
224        PoisonPolicy {
225            dead_letter: self.dead_letter.clone(),
226            max_deliveries: self.max_deliveries,
227        }
228    }
229
230    /// Resolves the recovery settings, or `None` when recovery was not opted into.
231    ///
232    /// # Errors
233    ///
234    /// Returns [`RedisError::InvalidOptions`] when a recovery ZSET is named without a
235    /// [`min_idle`](Self::min_idle), which has no sane default.
236    pub(crate) fn recovery_config(&self) -> Result<Option<RecoveryConfig>, RedisError> {
237        let Some(zset_key) = self.recovery_zset.clone() else {
238            return Ok(None);
239        };
240        let min_idle = self.min_idle.ok_or_else(|| {
241            RedisError::InvalidOptions(format!(
242                "reliable list recovery on `{}` needs a min_idle: call .min_idle(duration) \
243                 alongside .recovery_zset(key)",
244                self.key
245            ))
246        })?;
247        Ok(Some(RecoveryConfig {
248            zset_key,
249            min_idle,
250            ttl: self.recovery_ttl,
251        }))
252    }
253}
254
255impl SubscriptionSource<RedisBroker> for RedisList {
256    type Subscriber = RedisListSubscriber;
257
258    fn name(&self) -> &str {
259        self.key()
260    }
261
262    async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
263        broker.subscribe_list(self).await
264    }
265}
266
267/// A list-backed work-queue subscription.
268pub struct RedisListSubscriber {
269    pool: Pool,
270    key: String,
271    reliable: bool,
272    processing: String,
273    block: Duration,
274    codec: Option<SharedEnvelope>,
275    policy: PoisonPolicy,
276    recovery: Option<RecoveryConfig>,
277}
278
279impl Debug for RedisListSubscriber {
280    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
281        f.debug_struct("RedisListSubscriber")
282            .field("key", &self.key)
283            .field("reliable", &self.reliable)
284            .field("poison", &self.policy.is_active())
285            .field("recovery", &self.recovery.is_some())
286            .finish_non_exhaustive()
287    }
288}
289
290impl RedisListSubscriber {
291    #[allow(
292        clippy::too_many_arguments,
293        reason = "internal constructor mirroring the descriptor"
294    )]
295    pub(crate) fn new(
296        pool: Pool,
297        key: String,
298        reliable: bool,
299        processing: String,
300        block: Duration,
301        codec: Option<SharedEnvelope>,
302        policy: PoisonPolicy,
303        recovery: Option<RecoveryConfig>,
304    ) -> Self {
305        Self {
306            pool,
307            key,
308            reliable,
309            processing,
310            block,
311            codec,
312            policy,
313            recovery,
314        }
315    }
316
317    fn simple_message(&self, raw: &[u8]) -> RedisListMessage {
318        let (payload, headers) = unframe(self.codec.as_ref(), raw);
319        RedisListMessage {
320            payload,
321            headers,
322            ack: None,
323        }
324    }
325
326    fn reliable_message(&self, raw: Vec<u8>, recovery: Option<RecoveryHandle>) -> RedisListMessage {
327        let (payload, headers) = unframe(self.codec.as_ref(), &raw);
328        RedisListMessage {
329            payload,
330            headers,
331            ack: Some(ListAck {
332                pool: self.pool.clone(),
333                main_key: self.key.clone(),
334                processing_key: self.processing.clone(),
335                value: raw,
336                codec: self.codec.clone(),
337                policy: self.policy.clone(),
338                recovery,
339            }),
340        }
341    }
342
343    /// Blocks for the next entry, returning `None` when the pop times out (the caller loops). When
344    /// recovery is enabled, first returns any orphaned entries to the main list so this same pop can
345    /// pick them up.
346    async fn next_entry(&self) -> Result<Option<RedisListMessage>, RedisError> {
347        let secs = block_secs(self.block);
348        if self.reliable {
349            if let Some(cfg) = &self.recovery {
350                recovery::sweep_orphans(&self.pool, cfg, &self.key, &self.processing).await?;
351            }
352            let value: Option<Vec<u8>> = empty_on_timeout(
353                self.pool
354                    .blmove(
355                        self.key.as_str(),
356                        self.processing.as_str(),
357                        LMoveDirection::Right,
358                        LMoveDirection::Left,
359                        secs,
360                    )
361                    .await,
362            )?;
363            let Some(value) = value else {
364                return Ok(None);
365            };
366            let handle = match &self.recovery {
367                Some(cfg) => {
368                    let member = recovery::record_claim(&self.pool, cfg, &value).await?;
369                    Some(RecoveryHandle {
370                        zset_key: cfg.zset_key.clone(),
371                        member,
372                    })
373                }
374                None => None,
375            };
376            Ok(Some(self.reliable_message(value, handle)))
377        } else {
378            let popped: Option<(String, Vec<u8>)> =
379                empty_on_timeout(self.pool.brpop(self.key.as_str(), secs).await)?;
380            Ok(popped.map(|(_, v)| self.simple_message(&v)))
381        }
382    }
383}
384
385impl ruststream::Subscriber for RedisListSubscriber {
386    type Message = RedisListMessage;
387    type Error = RedisError;
388
389    /// Yields one message per popped entry.
390    ///
391    /// # Cancel safety
392    ///
393    /// Dropping the returned stream between items is safe. In reliable mode an entry already moved
394    /// to the processing list but not yet settled stays there until acked or recovered manually.
395    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
396        unfold(&*self, |s| async move {
397            loop {
398                match s.next_entry().await {
399                    Ok(Some(msg)) => return Some((Ok(msg), s)),
400                    Ok(None) => {}
401                    Err(err) => return Some((Err(err), s)),
402                }
403            }
404        })
405    }
406}
407
408/// Settlement handle for a reliable-mode list delivery.
409struct ListAck {
410    pool: Pool,
411    main_key: String,
412    processing_key: String,
413    /// The raw wire value (framed), needed verbatim to `LREM` it from the processing list.
414    value: Vec<u8>,
415    /// The framing codec, so a poison-policy requeue can re-frame with an updated retry count.
416    codec: Option<SharedEnvelope>,
417    policy: PoisonPolicy,
418    /// Set when orphan recovery is enabled: the ZSET key and the member tracking this claim, so
419    /// settling removes its recovery tracking.
420    recovery: Option<RecoveryHandle>,
421}
422
423/// The recovery-ZSET coordinates for one in-flight reliable-list claim.
424struct RecoveryHandle {
425    zset_key: String,
426    member: Vec<u8>,
427}
428
429/// A list-queue delivery. In simple mode `ack` / `nack` are unsupported; in reliable mode `ack`
430/// removes the entry from the processing list and `nack` either returns it or drops it.
431pub struct RedisListMessage {
432    payload: Bytes,
433    headers: Headers,
434    ack: Option<ListAck>,
435}
436
437impl Debug for RedisListMessage {
438    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439        f.debug_struct("RedisListMessage")
440            .field("payload_len", &self.payload.len())
441            .field("reliable", &self.ack.is_some())
442            .finish_non_exhaustive()
443    }
444}
445
446impl IncomingMessage for RedisListMessage {
447    fn payload(&self) -> &[u8] {
448        &self.payload
449    }
450
451    fn headers(&self) -> &Headers {
452        &self.headers
453    }
454
455    async fn ack(self) -> Result<(), AckError> {
456        let Some(handle) = self.ack else {
457            return Err(AckError::Unsupported);
458        };
459        settle(&handle).await
460    }
461
462    async fn nack(self, requeue: bool) -> Result<(), AckError> {
463        let Some(handle) = self.ack else {
464            return Err(AckError::Unsupported);
465        };
466        if requeue {
467            if handle.policy.is_active() {
468                let next = next_retry_count(&self.headers);
469                if handle.policy.is_poison(next) {
470                    list_dead_letter(&handle, &self.payload, &self.headers, REASON_MAX_DELIVERIES)
471                        .await?;
472                } else {
473                    // Re-frame with the incremented retry count and return it to the main list,
474                    // before removing the original from processing (a crash leaves a duplicate).
475                    let mut headers = self.headers.clone();
476                    headers.insert(RETRY_COUNT_HEADER, next.to_string());
477                    let body = frame(handle.codec.as_ref(), &self.payload, &headers);
478                    lpush(&handle.pool, handle.main_key.as_str(), body).await?;
479                }
480            } else {
481                // No poison policy: return the original entry verbatim to the main list.
482                lpush(&handle.pool, handle.main_key.as_str(), handle.value.clone()).await?;
483            }
484        } else if handle.policy.is_active() {
485            list_dead_letter(&handle, &self.payload, &self.headers, REASON_DROPPED).await?;
486        }
487        settle(&handle).await
488    }
489}
490
491fn ack_broker(err: fred::error::Error) -> AckError {
492    AckError::Broker(Box::new(err))
493}
494
495/// The next framework retry-count value (the current envelope header plus one, or one when absent).
496fn next_retry_count(headers: &Headers) -> u64 {
497    headers
498        .get_str(RETRY_COUNT_HEADER)
499        .and_then(|v| v.parse::<u64>().ok())
500        .unwrap_or(0)
501        + 1
502}
503
504async fn lpush(pool: &Pool, key: &str, body: Vec<u8>) -> Result<(), AckError> {
505    let _: i64 = pool.lpush(key, body).await.map_err(ack_broker)?;
506    Ok(())
507}
508
509/// `LPUSH`es a tagged copy onto the configured dead-letter list, or does nothing when none is set
510/// (the caller's `LREM` then discards the entry). Runs before the `LREM`, so a crash leaves a
511/// duplicate rather than a loss.
512async fn list_dead_letter(
513    handle: &ListAck,
514    payload: &[u8],
515    headers: &Headers,
516    reason: &'static str,
517) -> Result<(), AckError> {
518    if let Some(dlq) = handle.policy.dead_letter_key() {
519        let body = frame(
520            handle.codec.as_ref(),
521            payload,
522            &deadletter::with_reason(headers, reason),
523        );
524        lpush(&handle.pool, dlq, body).await?;
525    }
526    Ok(())
527}
528
529/// Removes the entry from the processing list and, when recovery is enabled, drops its tracking from
530/// the recovery ZSET.
531async fn settle(handle: &ListAck) -> Result<(), AckError> {
532    let _: i64 = handle
533        .pool
534        .lrem(handle.processing_key.as_str(), 1, handle.value.clone())
535        .await
536        .map_err(ack_broker)?;
537    if let Some(rec) = &handle.recovery {
538        recovery::forget(&handle.pool, &rec.zset_key, &rec.member).await?;
539    }
540    Ok(())
541}
542
543impl Partitioned for RedisListMessage {
544    fn partition_key(&self) -> Option<&[u8]> {
545        self.headers().get(PARTITION_KEY_HEADER)
546    }
547}
548
549/// Publishes onto a list with `LPUSH`, so right-popping consumers see FIFO order.
550///
551/// Obtain it from [`RedisBroker::list_publisher`](crate::RedisBroker::list_publisher). Headers are
552/// framed around the payload; set a [`codec`](Self::codec) for a readable wire format (it must match
553/// the subscriber's).
554#[derive(Clone)]
555pub struct RedisListPublisher {
556    pool: Arc<tokio::sync::OnceCell<Pool>>,
557    codec: Option<SharedEnvelope>,
558    ttl: Option<Duration>,
559}
560
561impl Debug for RedisListPublisher {
562    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
563        f.debug_struct("RedisListPublisher")
564            .field("codec", &self.codec.is_some())
565            .field("ttl", &self.ttl)
566            .finish_non_exhaustive()
567    }
568}
569
570impl RedisListPublisher {
571    pub(crate) fn new(pool: Arc<tokio::sync::OnceCell<Pool>>) -> Self {
572        Self {
573            pool,
574            codec: None,
575            ttl: None,
576        }
577    }
578
579    /// Serializes the header/payload envelope with `codec` (must match the subscriber). Without it
580    /// the default lossless binary framing is used.
581    #[must_use]
582    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
583        self.codec = Some(Arc::new(codec));
584        self
585    }
586
587    /// Sets a time-to-live on the list key, refreshed (`PEXPIRE`) on every publish, so an idle
588    /// queue auto-expires. Off by default: without it the list lives until drained or deleted.
589    ///
590    /// This is a per-key TTL on the whole list, not per-entry: Redis lists have no per-element
591    /// expiry, only the key can expire. Each publish pushes the entry and re-arms the key's TTL in
592    /// one pipeline, so an actively used queue never expires and only an idle one does. A sub-
593    /// millisecond `ttl` is clamped up to 1ms, since `PEXPIRE 0` would delete the key outright.
594    ///
595    /// # Examples
596    ///
597    /// ```no_run
598    /// use std::time::Duration;
599    /// use ruststream_fred::RedisBroker;
600    ///
601    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
602    /// let broker = RedisBroker::connect("redis://localhost:6379").await?;
603    /// let publisher = broker.list_publisher().ttl(Duration::from_secs(300));
604    /// # let _ = publisher;
605    /// # Ok(())
606    /// # }
607    /// ```
608    #[must_use]
609    pub const fn ttl(mut self, ttl: Duration) -> Self {
610        self.ttl = Some(ttl);
611        self
612    }
613}
614
615/// Converts a TTL to the positive millisecond count `PEXPIRE` expects, clamping a sub-millisecond
616/// value up to 1 (a `PEXPIRE 0` deletes the key instead of expiring it).
617fn ttl_millis(ttl: Duration) -> i64 {
618    i64::try_from(ttl.as_millis()).unwrap_or(i64::MAX).max(1)
619}
620
621impl ruststream::Publisher for RedisListPublisher {
622    type Error = RedisError;
623
624    async fn publish(&self, msg: ruststream::OutgoingMessage<'_>) -> Result<(), Self::Error> {
625        let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
626        let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
627        let Some(ttl) = self.ttl else {
628            let _: i64 = pool
629                .lpush(msg.name(), body)
630                .await
631                .map_err(RedisError::publish)?;
632            return Ok(());
633        };
634        // Push the entry and re-arm the key TTL in one pipeline, so an actively used queue keeps
635        // resetting its expiry and only an idle one is allowed to lapse.
636        let pipeline = pool.next().pipeline();
637        let _: () = pipeline
638            .lpush(msg.name(), body)
639            .await
640            .map_err(RedisError::publish)?;
641        let _: () = pipeline
642            .pexpire(msg.name(), ttl_millis(ttl), None)
643            .await
644            .map_err(RedisError::publish)?;
645        let _: Vec<fred::types::Value> = pipeline.all().await.map_err(RedisError::publish)?;
646        Ok(())
647    }
648}
649
650#[cfg(test)]
651mod tests {
652    use super::*;
653
654    #[test]
655    fn ttl_millis_converts_and_clamps() {
656        assert_eq!(ttl_millis(Duration::from_secs(60)), 60_000);
657        assert_eq!(ttl_millis(Duration::from_millis(1)), 1);
658        // A sub-millisecond TTL must not become PEXPIRE 0 (which deletes the key).
659        assert_eq!(ttl_millis(Duration::from_nanos(1)), 1);
660        assert_eq!(ttl_millis(Duration::ZERO), 1);
661    }
662}