Skip to main content

ruststream_fred/
list.rs

1//! Redis list transport: a competing-consumers work queue.
2//!
3//! A producer `LPUSH`es onto the list; consumers pop from the right (`BRPOP`), so delivery is FIFO
4//! and each entry goes to exactly one consumer (no fan-out, no replay, no groups). Two modes:
5//!
6//! * Simple (default) - `BRPOP`, at-most-once. `ack` / `nack` report [`AckError::Unsupported`]: once
7//!   popped, the entry is gone, so a crash mid-handler loses it.
8//! * Reliable ([`RedisList::reliable`]) - `LMOVE` the entry to a per-consumer processing list, then
9//!   `LREM` it on `ack` (at-least-once). `nack(requeue = true)` returns it to the main list;
10//!   `nack(requeue = false)` removes it.
11//!
12//! Reliable mode has no native idle/pending tracking, so a consumer that dies after `LMOVE` but
13//! before settling leaves its entry stranded on the processing list. Opting into a recovery ZSET
14//! with [`RedisList::recovery_zset`] (and [`RedisList::min_idle`]) starts a watchdog that returns
15//! such orphans to the main list; without it (the default) reliable lists have no orphan recovery,
16//! and Redis Streams ([`crate::RedisStream`]) remain the recommended durable path. See
17//! [`crate::recovery`].
18//!
19//! Headers travel in a frame around the payload (see [`crate::envelope`]): a lossless binary frame
20//! by default, or a readable codec-serialized envelope when a codec is set with
21//! [`RedisList::codec`] / [`RedisListPublisher::codec`].
22
23use std::fmt::{Debug, Formatter};
24use std::sync::Arc;
25use std::time::Duration;
26
27use bytes::Bytes;
28use fred::clients::Pool;
29use fred::error::ErrorKind;
30use fred::interfaces::{KeysInterface, ListInterface};
31use fred::types::lists::LMoveDirection;
32use futures::Stream;
33use futures::stream::unfold;
34use ruststream::codec::Codec;
35use ruststream::runtime::RETRY_COUNT_HEADER;
36use ruststream::{AckError, Headers, IncomingMessage, Partitioned, SubscriptionSource};
37
38use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
39use crate::envelope::{SharedEnvelope, frame, unframe};
40use crate::recovery::{self, RecoveryConfig};
41use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
42
43const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
44/// Suffix appended to the list key to form the default per-consumer processing list (reliable mode).
45const PROCESSING_SUFFIX: &str = ".processing";
46
47fn block_secs(block: Duration) -> f64 {
48    block.as_secs_f64()
49}
50
51/// Normalizes a blocking pop (`BRPOP` / `BLMOVE`) result: fred reports a timed-out pop with nothing
52/// available as a timeout error rather than an empty reply, so treat that as "no entry this round"
53/// and let the read loop retry. Any other error propagates.
54fn empty_on_timeout<T>(
55    result: Result<Option<T>, fred::error::Error>,
56) -> Result<Option<T>, RedisError> {
57    match result {
58        Ok(value) => Ok(value),
59        Err(err) if matches!(err.kind(), ErrorKind::Timeout) => Ok(None),
60        Err(err) => Err(RedisError::stream(err)),
61    }
62}
63
64/// Describes one list subscription against [`crate::RedisBroker`].
65///
66/// # Examples
67///
68/// ```
69/// use std::time::Duration;
70/// use ruststream_fred::RedisList;
71///
72/// let simple = RedisList::new("jobs");
73/// let reliable = RedisList::new("jobs").reliable().block(Duration::from_secs(2));
74/// # let _ = (simple, reliable);
75/// ```
76#[derive(Clone)]
77#[must_use]
78pub struct RedisList {
79    key: String,
80    reliable: bool,
81    processing: Option<String>,
82    block: Option<Duration>,
83    codec: Option<SharedEnvelope>,
84    dead_letter: Option<String>,
85    max_deliveries: Option<u64>,
86    min_idle: Option<Duration>,
87    recovery_zset: Option<String>,
88    recovery_ttl: Option<Duration>,
89}
90
91impl Debug for RedisList {
92    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("RedisList")
94            .field("key", &self.key)
95            .field("reliable", &self.reliable)
96            .field("processing", &self.processing)
97            .field("codec", &self.codec.is_some())
98            .field("dead_letter", &self.dead_letter)
99            .field("max_deliveries", &self.max_deliveries)
100            .field("recovery_zset", &self.recovery_zset)
101            .field("recovery_ttl", &self.recovery_ttl)
102            .finish_non_exhaustive()
103    }
104}
105
106impl RedisList {
107    /// A simple (at-most-once) `BRPOP` work-queue consumer on `key`.
108    pub fn new(key: impl Into<String>) -> Self {
109        Self {
110            key: key.into(),
111            reliable: false,
112            processing: None,
113            block: None,
114            codec: None,
115            dead_letter: None,
116            max_deliveries: None,
117            min_idle: None,
118            recovery_zset: None,
119            recovery_ttl: None,
120        }
121    }
122
123    /// Switches to reliable (at-least-once) mode: entries move to a processing list and are removed
124    /// on `ack`.
125    pub const fn reliable(mut self) -> Self {
126        self.reliable = true;
127        self
128    }
129
130    /// Sets the processing-list key used in reliable mode. Defaults to `<key>.processing`.
131    pub fn processing(mut self, key: impl Into<String>) -> Self {
132        self.processing = Some(key.into());
133        self
134    }
135
136    /// How long one blocking pop waits before looping. Defaults to 5 seconds.
137    pub const fn block(mut self, block: Duration) -> Self {
138        self.block = Some(block);
139        self
140    }
141
142    /// Decodes the header/payload envelope with `codec` (must match the publisher). Without it the
143    /// default lossless binary framing is used.
144    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
145        self.codec = Some(Arc::new(codec));
146        self
147    }
148
149    /// In reliable mode, routes dropped and poison entries to the named dead-letter list (`LPUSH`)
150    /// instead of discarding them, tagged with
151    /// [`DEAD_LETTER_REASON_HEADER`](crate::DEAD_LETTER_REASON_HEADER). Off by default. Has no effect
152    /// on a simple list, which cannot ack. See [`crate::deadletter`].
153    pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
154        self.dead_letter = Some(key.into());
155        self
156    }
157
158    /// In reliable mode, caps how many times an entry may be `nack(requeue = true)`-ed before it is
159    /// treated as poison (dead-lettered or, with no dead-letter list, discarded). Off by default.
160    ///
161    /// Lists have no native delivery counter, so this tracks the framework retry-count header carried
162    /// in the entry's envelope.
163    pub const fn max_deliveries(mut self, max: u64) -> Self {
164        self.max_deliveries = Some(max);
165        self
166    }
167
168    /// How long a claimed reliable-mode entry may sit idle on the processing list before the
169    /// recovery watchdog returns it to the main list. Required for (and only meaningful with)
170    /// [`recovery_zset`](Self::recovery_zset).
171    ///
172    /// It has no default and must exceed the longest legitimate handler runtime: set it too low and
173    /// a healthy consumer's in-flight entry gets recovered and processed twice.
174    pub const fn min_idle(mut self, min_idle: Duration) -> Self {
175        self.min_idle = Some(min_idle);
176        self
177    }
178
179    /// Opts reliable mode into orphan recovery, naming the ZSET key that tracks in-flight claims.
180    ///
181    /// Off by default (a dead consumer's entry stays stranded on the processing list). The key has
182    /// no sane default, so it is named explicitly here; pair it with [`min_idle`](Self::min_idle),
183    /// which is required when recovery is on. Reliable mode is implied. See [`crate::recovery`].
184    pub fn recovery_zset(mut self, key: impl Into<String>) -> Self {
185        self.recovery_zset = Some(key.into());
186        self.reliable = true;
187        self
188    }
189
190    /// An optional auto-cleanup TTL on the recovery ZSET key (refreshed on every claim).
191    ///
192    /// When set it must exceed [`min_idle`](Self::min_idle) (and the longest legitimate handler
193    /// runtime), or in-flight tracking is dropped before the watchdog can act.
194    pub const fn recovery_ttl(mut self, ttl: Duration) -> Self {
195        self.recovery_ttl = Some(ttl);
196        self
197    }
198
199    /// The list key this subscription consumes.
200    #[must_use]
201    pub fn key(&self) -> &str {
202        &self.key
203    }
204
205    pub(crate) const fn is_reliable(&self) -> bool {
206        self.reliable
207    }
208
209    pub(crate) fn processing_or_default(&self) -> String {
210        self.processing
211            .clone()
212            .unwrap_or_else(|| format!("{}{PROCESSING_SUFFIX}", self.key))
213    }
214
215    pub(crate) fn block_or_default(&self) -> Duration {
216        self.block.unwrap_or(DEFAULT_BLOCK)
217    }
218
219    pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
220        self.codec.clone()
221    }
222
223    pub(crate) fn poison_policy(&self) -> PoisonPolicy {
224        PoisonPolicy {
225            dead_letter: self.dead_letter.clone(),
226            max_deliveries: self.max_deliveries,
227        }
228    }
229
230    /// Resolves the recovery settings, or `None` when recovery was not opted into.
231    ///
232    /// # Errors
233    ///
234    /// Returns [`RedisError::InvalidOptions`] when a recovery ZSET is named without a
235    /// [`min_idle`](Self::min_idle), which has no sane default.
236    pub(crate) fn recovery_config(&self) -> Result<Option<RecoveryConfig>, RedisError> {
237        let Some(zset_key) = self.recovery_zset.clone() else {
238            return Ok(None);
239        };
240        let min_idle = self.min_idle.ok_or_else(|| {
241            RedisError::InvalidOptions(format!(
242                "reliable list recovery on `{}` needs a min_idle: call .min_idle(duration) \
243                 alongside .recovery_zset(key)",
244                self.key
245            ))
246        })?;
247        Ok(Some(RecoveryConfig {
248            zset_key,
249            min_idle,
250            ttl: self.recovery_ttl,
251        }))
252    }
253}
254
255impl SubscriptionSource<RedisBroker> for RedisList {
256    type Subscriber = RedisListSubscriber;
257
258    fn name(&self) -> &str {
259        self.key()
260    }
261
262    async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
263        broker.subscribe_list(self).await
264    }
265}
266
267#[cfg(feature = "testing")]
268impl SubscriptionSource<crate::testing::RedisTestBroker> for RedisList {
269    type Subscriber = crate::testing::RedisTestSubscriber;
270
271    fn name(&self) -> &str {
272        self.key()
273    }
274
275    async fn subscribe(
276        self,
277        broker: &crate::testing::RedisTestBroker,
278    ) -> Result<Self::Subscriber, RedisError> {
279        broker.subscribe(self.key()).await
280    }
281}
282
283/// A list-backed work-queue subscription.
284pub struct RedisListSubscriber {
285    pool: Pool,
286    key: String,
287    reliable: bool,
288    processing: String,
289    block: Duration,
290    codec: Option<SharedEnvelope>,
291    policy: PoisonPolicy,
292    recovery: Option<RecoveryConfig>,
293}
294
295impl Debug for RedisListSubscriber {
296    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
297        f.debug_struct("RedisListSubscriber")
298            .field("key", &self.key)
299            .field("reliable", &self.reliable)
300            .field("poison", &self.policy.is_active())
301            .field("recovery", &self.recovery.is_some())
302            .finish_non_exhaustive()
303    }
304}
305
306impl RedisListSubscriber {
307    #[allow(
308        clippy::too_many_arguments,
309        reason = "internal constructor mirroring the descriptor"
310    )]
311    pub(crate) fn new(
312        pool: Pool,
313        key: String,
314        reliable: bool,
315        processing: String,
316        block: Duration,
317        codec: Option<SharedEnvelope>,
318        policy: PoisonPolicy,
319        recovery: Option<RecoveryConfig>,
320    ) -> Self {
321        Self {
322            pool,
323            key,
324            reliable,
325            processing,
326            block,
327            codec,
328            policy,
329            recovery,
330        }
331    }
332
333    fn simple_message(&self, raw: &[u8]) -> RedisListMessage {
334        let (payload, headers) = unframe(self.codec.as_ref(), raw);
335        RedisListMessage {
336            payload,
337            headers,
338            ack: None,
339        }
340    }
341
342    fn reliable_message(&self, raw: Vec<u8>, recovery: Option<RecoveryHandle>) -> RedisListMessage {
343        let (payload, headers) = unframe(self.codec.as_ref(), &raw);
344        RedisListMessage {
345            payload,
346            headers,
347            ack: Some(ListAck {
348                pool: self.pool.clone(),
349                main_key: self.key.clone(),
350                processing_key: self.processing.clone(),
351                value: raw,
352                codec: self.codec.clone(),
353                policy: self.policy.clone(),
354                recovery,
355            }),
356        }
357    }
358
359    /// Blocks for the next entry, returning `None` when the pop times out (the caller loops). When
360    /// recovery is enabled, first returns any orphaned entries to the main list so this same pop can
361    /// pick them up.
362    async fn next_entry(&self) -> Result<Option<RedisListMessage>, RedisError> {
363        let secs = block_secs(self.block);
364        if self.reliable {
365            if let Some(cfg) = &self.recovery {
366                recovery::sweep_orphans(&self.pool, cfg, &self.key, &self.processing).await?;
367            }
368            let value: Option<Vec<u8>> = empty_on_timeout(
369                self.pool
370                    .blmove(
371                        self.key.as_str(),
372                        self.processing.as_str(),
373                        LMoveDirection::Right,
374                        LMoveDirection::Left,
375                        secs,
376                    )
377                    .await,
378            )?;
379            let Some(value) = value else {
380                return Ok(None);
381            };
382            let handle = match &self.recovery {
383                Some(cfg) => {
384                    let member = recovery::record_claim(&self.pool, cfg, &value).await?;
385                    Some(RecoveryHandle {
386                        zset_key: cfg.zset_key.clone(),
387                        member,
388                    })
389                }
390                None => None,
391            };
392            Ok(Some(self.reliable_message(value, handle)))
393        } else {
394            let popped: Option<(String, Vec<u8>)> =
395                empty_on_timeout(self.pool.brpop(self.key.as_str(), secs).await)?;
396            Ok(popped.map(|(_, v)| self.simple_message(&v)))
397        }
398    }
399}
400
401impl ruststream::Subscriber for RedisListSubscriber {
402    type Message = RedisListMessage;
403    type Error = RedisError;
404
405    /// Yields one message per popped entry.
406    ///
407    /// # Cancel safety
408    ///
409    /// Dropping the returned stream between items is safe. In reliable mode an entry already moved
410    /// to the processing list but not yet settled stays there until acked or recovered manually.
411    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
412        unfold(&*self, |s| async move {
413            loop {
414                match s.next_entry().await {
415                    Ok(Some(msg)) => return Some((Ok(msg), s)),
416                    Ok(None) => {}
417                    Err(err) => return Some((Err(err), s)),
418                }
419            }
420        })
421    }
422}
423
424/// Settlement handle for a reliable-mode list delivery.
425struct ListAck {
426    pool: Pool,
427    main_key: String,
428    processing_key: String,
429    /// The raw wire value (framed), needed verbatim to `LREM` it from the processing list.
430    value: Vec<u8>,
431    /// The framing codec, so a poison-policy requeue can re-frame with an updated retry count.
432    codec: Option<SharedEnvelope>,
433    policy: PoisonPolicy,
434    /// Set when orphan recovery is enabled: the ZSET key and the member tracking this claim, so
435    /// settling removes its recovery tracking.
436    recovery: Option<RecoveryHandle>,
437}
438
439/// The recovery-ZSET coordinates for one in-flight reliable-list claim.
440struct RecoveryHandle {
441    zset_key: String,
442    member: Vec<u8>,
443}
444
445/// A list-queue delivery. In simple mode `ack` / `nack` are unsupported; in reliable mode `ack`
446/// removes the entry from the processing list and `nack` either returns it or drops it.
447pub struct RedisListMessage {
448    payload: Bytes,
449    headers: Headers,
450    ack: Option<ListAck>,
451}
452
453impl Debug for RedisListMessage {
454    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
455        f.debug_struct("RedisListMessage")
456            .field("payload_len", &self.payload.len())
457            .field("reliable", &self.ack.is_some())
458            .finish_non_exhaustive()
459    }
460}
461
462impl IncomingMessage for RedisListMessage {
463    fn payload(&self) -> &[u8] {
464        &self.payload
465    }
466
467    fn headers(&self) -> &Headers {
468        &self.headers
469    }
470
471    async fn ack(self) -> Result<(), AckError> {
472        let Some(handle) = self.ack else {
473            return Err(AckError::Unsupported);
474        };
475        settle(&handle).await
476    }
477
478    async fn nack(self, requeue: bool) -> Result<(), AckError> {
479        let Some(handle) = self.ack else {
480            return Err(AckError::Unsupported);
481        };
482        if requeue {
483            if handle.policy.is_active() {
484                let next = next_retry_count(&self.headers);
485                if handle.policy.is_poison(next) {
486                    list_dead_letter(&handle, &self.payload, &self.headers, REASON_MAX_DELIVERIES)
487                        .await?;
488                } else {
489                    // Re-frame with the incremented retry count and return it to the main list,
490                    // before removing the original from processing (a crash leaves a duplicate).
491                    let mut headers = self.headers.clone();
492                    headers.insert(RETRY_COUNT_HEADER, next.to_string());
493                    let body = frame(handle.codec.as_ref(), &self.payload, &headers);
494                    lpush(&handle.pool, handle.main_key.as_str(), body).await?;
495                }
496            } else {
497                // No poison policy: return the original entry verbatim to the main list.
498                lpush(&handle.pool, handle.main_key.as_str(), handle.value.clone()).await?;
499            }
500        } else if handle.policy.is_active() {
501            list_dead_letter(&handle, &self.payload, &self.headers, REASON_DROPPED).await?;
502        }
503        settle(&handle).await
504    }
505}
506
507fn ack_broker(err: fred::error::Error) -> AckError {
508    AckError::Broker(Box::new(err))
509}
510
511/// The next framework retry-count value (the current envelope header plus one, or one when absent).
512fn next_retry_count(headers: &Headers) -> u64 {
513    headers
514        .get_str(RETRY_COUNT_HEADER)
515        .and_then(|v| v.parse::<u64>().ok())
516        .unwrap_or(0)
517        + 1
518}
519
520async fn lpush(pool: &Pool, key: &str, body: Vec<u8>) -> Result<(), AckError> {
521    let _: i64 = pool.lpush(key, body).await.map_err(ack_broker)?;
522    Ok(())
523}
524
525/// `LPUSH`es a tagged copy onto the configured dead-letter list, or does nothing when none is set
526/// (the caller's `LREM` then discards the entry). Runs before the `LREM`, so a crash leaves a
527/// duplicate rather than a loss.
528async fn list_dead_letter(
529    handle: &ListAck,
530    payload: &[u8],
531    headers: &Headers,
532    reason: &'static str,
533) -> Result<(), AckError> {
534    if let Some(dlq) = handle.policy.dead_letter_key() {
535        let body = frame(
536            handle.codec.as_ref(),
537            payload,
538            &deadletter::with_reason(headers, reason),
539        );
540        lpush(&handle.pool, dlq, body).await?;
541    }
542    Ok(())
543}
544
545/// Removes the entry from the processing list and, when recovery is enabled, drops its tracking from
546/// the recovery ZSET.
547async fn settle(handle: &ListAck) -> Result<(), AckError> {
548    let _: i64 = handle
549        .pool
550        .lrem(handle.processing_key.as_str(), 1, handle.value.clone())
551        .await
552        .map_err(ack_broker)?;
553    if let Some(rec) = &handle.recovery {
554        recovery::forget(&handle.pool, &rec.zset_key, &rec.member).await?;
555    }
556    Ok(())
557}
558
559impl Partitioned for RedisListMessage {
560    fn partition_key(&self) -> Option<&[u8]> {
561        self.headers().get(PARTITION_KEY_HEADER)
562    }
563}
564
565/// Publishes onto a list with `LPUSH`, so right-popping consumers see FIFO order.
566///
567/// Obtain it from [`RedisBroker::list_publisher`](crate::RedisBroker::list_publisher). Headers are
568/// framed around the payload; set a [`codec`](Self::codec) for a readable wire format (it must match
569/// the subscriber's).
570#[derive(Clone)]
571pub struct RedisListPublisher {
572    pool: Arc<tokio::sync::OnceCell<Pool>>,
573    codec: Option<SharedEnvelope>,
574    ttl: Option<Duration>,
575}
576
577impl Debug for RedisListPublisher {
578    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579        f.debug_struct("RedisListPublisher")
580            .field("codec", &self.codec.is_some())
581            .field("ttl", &self.ttl)
582            .finish_non_exhaustive()
583    }
584}
585
586impl RedisListPublisher {
587    pub(crate) fn new(pool: Arc<tokio::sync::OnceCell<Pool>>) -> Self {
588        Self {
589            pool,
590            codec: None,
591            ttl: None,
592        }
593    }
594
595    /// Serializes the header/payload envelope with `codec` (must match the subscriber). Without it
596    /// the default lossless binary framing is used.
597    #[must_use]
598    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
599        self.codec = Some(Arc::new(codec));
600        self
601    }
602
603    /// Sets a time-to-live on the list key, refreshed (`PEXPIRE`) on every publish, so an idle
604    /// queue auto-expires. Off by default: without it the list lives until drained or deleted.
605    ///
606    /// This is a per-key TTL on the whole list, not per-entry: Redis lists have no per-element
607    /// expiry, only the key can expire. Each publish pushes the entry and re-arms the key's TTL in
608    /// one pipeline, so an actively used queue never expires and only an idle one does. A sub-
609    /// millisecond `ttl` is clamped up to 1ms, since `PEXPIRE 0` would delete the key outright.
610    ///
611    /// # Examples
612    ///
613    /// ```no_run
614    /// use std::time::Duration;
615    /// use ruststream_fred::RedisBroker;
616    ///
617    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
618    /// let broker = RedisBroker::connect("redis://localhost:6379").await?;
619    /// let publisher = broker.list_publisher().ttl(Duration::from_secs(300));
620    /// # let _ = publisher;
621    /// # Ok(())
622    /// # }
623    /// ```
624    #[must_use]
625    pub const fn ttl(mut self, ttl: Duration) -> Self {
626        self.ttl = Some(ttl);
627        self
628    }
629}
630
631/// Converts a TTL to the positive millisecond count `PEXPIRE` expects, clamping a sub-millisecond
632/// value up to 1 (a `PEXPIRE 0` deletes the key instead of expiring it).
633fn ttl_millis(ttl: Duration) -> i64 {
634    i64::try_from(ttl.as_millis()).unwrap_or(i64::MAX).max(1)
635}
636
637impl ruststream::Publisher for RedisListPublisher {
638    type Error = RedisError;
639
640    async fn publish(&self, msg: ruststream::OutgoingMessage<'_>) -> Result<(), Self::Error> {
641        let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
642        let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
643        let Some(ttl) = self.ttl else {
644            let _: i64 = pool
645                .lpush(msg.name(), body)
646                .await
647                .map_err(RedisError::publish)?;
648            return Ok(());
649        };
650        // Push the entry and re-arm the key TTL in one pipeline, so an actively used queue keeps
651        // resetting its expiry and only an idle one is allowed to lapse.
652        let pipeline = pool.next().pipeline();
653        let _: () = pipeline
654            .lpush(msg.name(), body)
655            .await
656            .map_err(RedisError::publish)?;
657        let _: () = pipeline
658            .pexpire(msg.name(), ttl_millis(ttl), None)
659            .await
660            .map_err(RedisError::publish)?;
661        let _: Vec<fred::types::Value> = pipeline.all().await.map_err(RedisError::publish)?;
662        Ok(())
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    #[test]
671    fn ttl_millis_converts_and_clamps() {
672        assert_eq!(ttl_millis(Duration::from_secs(60)), 60_000);
673        assert_eq!(ttl_millis(Duration::from_millis(1)), 1);
674        // A sub-millisecond TTL must not become PEXPIRE 0 (which deletes the key).
675        assert_eq!(ttl_millis(Duration::from_nanos(1)), 1);
676        assert_eq!(ttl_millis(Duration::ZERO), 1);
677    }
678}