Skip to main content

hitbox_redis/
backend.rs

1//! Redis backend implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use hitbox::{BackendLabel, CacheKey, CacheValue, Raw};
9use hitbox_backend::{
10    Backend, BackendResult, CacheKeyFormat, Compressor, DeleteStatus, PassthroughCompressor,
11    format::{BincodeFormat, Format},
12};
13use redis::Client;
14use redis::aio::ConnectionManager;
15#[cfg(feature = "cluster")]
16use redis::cluster_async::ClusterConnection;
17use tokio::sync::OnceCell;
18
19use crate::error::Error;
20
21/// Configuration for a single Redis node connection.
22///
23/// # When You'll Encounter This
24///
25/// You typically don't create this directly. It appears when:
26/// - Using [`ConnectionMode::single`] which creates this internally
27/// - Accessing configuration for debugging or logging
28///
29/// # Examples
30///
31/// ```
32/// use hitbox_redis::SingleConfig;
33///
34/// let config = SingleConfig::new("redis://localhost:6379/");
35/// ```
36#[derive(Debug, Clone)]
37pub struct SingleConfig {
38    /// Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`.
39    pub(crate) url: String,
40    /// Exponential backoff base for reconnection attempts. Default: `2.0`.
41    pub(crate) exponent_base: f32,
42}
43
44impl SingleConfig {
45    /// Creates a new single-node configuration.
46    ///
47    /// # Arguments
48    ///
49    /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
50    ///
51    /// # Default
52    ///
53    /// * `exponent_base`: `2.0` (exponential backoff base for retries)
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use hitbox_redis::SingleConfig;
59    ///
60    /// let config = SingleConfig::new("redis://localhost:6379/0");
61    /// ```
62    pub fn new(url: impl Into<String>) -> Self {
63        Self {
64            url: url.into(),
65            exponent_base: 2.0,
66        }
67    }
68}
69
70/// Configuration for a Redis Cluster connection.
71///
72/// # When You'll Encounter This
73///
74/// You typically don't create this directly. It appears when:
75/// - Using [`ConnectionMode::cluster`] which creates this internally
76/// - Accessing configuration for debugging or logging
77///
78/// # Examples
79///
80/// ```
81/// # #[cfg(feature = "cluster")]
82/// # fn main() {
83/// use hitbox_redis::ClusterConfig;
84///
85/// let config = ClusterConfig::new([
86///     "redis://node1:6379",
87///     "redis://node2:6379",
88///     "redis://node3:6379",
89/// ]);
90/// # }
91/// # #[cfg(not(feature = "cluster"))]
92/// # fn main() {}
93/// ```
94#[cfg(feature = "cluster")]
95#[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
96#[derive(Debug, Clone)]
97pub struct ClusterConfig {
98    /// List of initial cluster node URLs. The client discovers other nodes automatically.
99    pub(crate) nodes: Vec<String>,
100    /// Whether to allow reading from replica nodes. Default: `false`.
101    pub(crate) read_from_replicas: bool,
102}
103
104#[cfg(feature = "cluster")]
105impl ClusterConfig {
106    /// Creates a new cluster configuration.
107    ///
108    /// # Arguments
109    ///
110    /// * `nodes` - List of initial cluster node URLs. The client discovers
111    ///   other nodes automatically via the `CLUSTER SLOTS` command.
112    ///
113    /// # Default
114    ///
115    /// * `read_from_replicas`: `false`
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// # #[cfg(feature = "cluster")]
121    /// # fn main() {
122    /// use hitbox_redis::ClusterConfig;
123    ///
124    /// let config = ClusterConfig::new([
125    ///     "redis://node1:6379",
126    ///     "redis://node2:6379",
127    ///     "redis://node3:6379",
128    /// ]);
129    /// # }
130    /// # #[cfg(not(feature = "cluster"))]
131    /// # fn main() {}
132    /// ```
133    pub fn new<I, S>(nodes: I) -> Self
134    where
135        I: IntoIterator<Item = S>,
136        S: Into<String>,
137    {
138        Self {
139            nodes: nodes.into_iter().map(Into::into).collect(),
140            read_from_replicas: false,
141        }
142    }
143}
144
145/// Redis connection mode.
146///
147/// Determines whether to connect to a single Redis instance or a Redis Cluster.
148///
149/// # Examples
150///
151/// Single-node connection:
152/// ```
153/// use hitbox_redis::ConnectionMode;
154///
155/// let mode = ConnectionMode::single("redis://localhost:6379/");
156/// ```
157///
158/// Cluster connection (requires `cluster` feature):
159///
160/// ```
161/// # #[cfg(feature = "cluster")]
162/// # fn main() {
163/// use hitbox_redis::ConnectionMode;
164///
165/// let mode = ConnectionMode::cluster([
166///     "redis://node1:6379",
167///     "redis://node2:6379",
168///     "redis://node3:6379",
169/// ]);
170/// # }
171/// # #[cfg(not(feature = "cluster"))]
172/// # fn main() {}
173/// ```
174#[derive(Debug, Clone)]
175pub enum ConnectionMode {
176    /// Single Redis node connection.
177    Single(SingleConfig),
178
179    /// Redis Cluster connection.
180    #[cfg(feature = "cluster")]
181    #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
182    Cluster(ClusterConfig),
183}
184
185impl ConnectionMode {
186    /// Create a single-node connection mode.
187    ///
188    /// # Arguments
189    ///
190    /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
191    pub fn single(url: impl Into<String>) -> Self {
192        Self::Single(SingleConfig::new(url))
193    }
194
195    /// Create a cluster connection mode.
196    ///
197    /// # Arguments
198    ///
199    /// * `nodes` - List of initial cluster node URLs. The client will discover other nodes automatically.
200    #[cfg(feature = "cluster")]
201    #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
202    pub fn cluster<I, S>(nodes: I) -> Self
203    where
204        I: IntoIterator<Item = S>,
205        S: Into<String>,
206    {
207        Self::Cluster(ClusterConfig::new(nodes))
208    }
209
210    /// Sets the exponential backoff base for retries (single-node only).
211    ///
212    /// The delay between reconnection attempts is calculated as `base^attempt` milliseconds.
213    ///
214    /// # Default
215    ///
216    /// `2.0`
217    ///
218    /// # Caveats
219    ///
220    /// This option only applies to single-node connections and is silently ignored
221    /// for cluster mode.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// use hitbox_redis::ConnectionMode;
227    ///
228    /// // Use a slower backoff (3^attempt ms)
229    /// let mode = ConnectionMode::single("redis://localhost:6379/")
230    ///     .exponent_base(3.0);
231    /// ```
232    #[allow(irrefutable_let_patterns)]
233    pub fn exponent_base(mut self, base: f32) -> Self {
234        if let Self::Single(ref mut config) = self {
235            config.exponent_base = base;
236        }
237        self
238    }
239
240    /// Enables reading from replica nodes (cluster only).
241    ///
242    /// When enabled, read operations may be served by replica nodes for better
243    /// read throughput and reduced load on primary nodes.
244    ///
245    /// # Default
246    ///
247    /// Disabled (reads only from primary nodes).
248    ///
249    /// # Caveats
250    ///
251    /// - Replicas may have slightly stale data due to replication lag
252    /// - This option only applies to cluster connections and is silently ignored
253    ///   for single-node mode
254    ///
255    /// # Examples
256    ///
257    /// ```
258    /// # #[cfg(feature = "cluster")]
259    /// # fn main() {
260    /// use hitbox_redis::ConnectionMode;
261    ///
262    /// let mode = ConnectionMode::cluster([
263    ///     "redis://node1:6379",
264    ///     "redis://node2:6379",
265    /// ])
266    /// .read_from_replicas();
267    /// # }
268    /// # #[cfg(not(feature = "cluster"))]
269    /// # fn main() {}
270    /// ```
271    #[cfg(feature = "cluster")]
272    #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
273    pub fn read_from_replicas(mut self) -> Self {
274        if let Self::Cluster(ref mut config) = self {
275            config.read_from_replicas = true;
276        }
277        self
278    }
279}
280
281/// Internal wrapper for Redis connection types.
282#[derive(Clone)]
283enum RedisConnection {
284    Single(ConnectionManager),
285    #[cfg(feature = "cluster")]
286    Cluster(ClusterConnection),
287}
288
289impl RedisConnection {
290    /// Execute a pipeline and return the result.
291    async fn query_pipeline<T: redis::FromRedisValue>(
292        &mut self,
293        pipe: &redis::Pipeline,
294    ) -> Result<T, redis::RedisError> {
295        match self {
296            Self::Single(conn) => pipe.query_async(conn).await,
297            #[cfg(feature = "cluster")]
298            Self::Cluster(conn) => pipe.query_async(conn).await,
299        }
300    }
301
302    /// Execute a single command.
303    async fn query_cmd<T: redis::FromRedisValue>(
304        &mut self,
305        cmd: &mut redis::Cmd,
306    ) -> Result<T, redis::RedisError> {
307        match self {
308            Self::Single(conn) => cmd.query_async(conn).await,
309            #[cfg(feature = "cluster")]
310            Self::Cluster(conn) => cmd.query_async(conn).await,
311        }
312    }
313}
314
315/// Redis cache backend for single-node or cluster deployments.
316///
317/// `RedisBackend` provides a cache backend using Redis as the storage layer.
318/// It supports both single-node Redis instances and Redis Cluster
319/// (with the `cluster` feature enabled).
320///
321/// Use [`RedisBackendBuilder`] to construct this type.
322///
323/// # Type Parameters
324///
325/// * `S` - Serialization format for cache values. Implements [`Format`].
326///   Default: [`BincodeFormat`] (compact binary, recommended for production).
327/// * `C` - Compression strategy for cache values. Implements [`Compressor`].
328///   Default: [`PassthroughCompressor`] (no compression).
329///
330/// # Examples
331///
332/// Basic single-node connection:
333///
334/// ```
335/// use hitbox_redis::{RedisBackend, ConnectionMode};
336///
337/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
338/// let backend = RedisBackend::builder()
339///     .connection(ConnectionMode::single("redis://localhost:6379/"))
340///     .build()?;
341/// # Ok(())
342/// # }
343/// ```
344///
345/// With all configuration options:
346///
347/// ```
348/// use std::time::Duration;
349/// use hitbox_redis::{RedisBackend, ConnectionMode};
350/// use hitbox_backend::CacheKeyFormat;
351/// use hitbox_backend::format::JsonFormat;
352///
353/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
354/// let backend = RedisBackend::builder()
355///     .connection(ConnectionMode::single("redis://localhost:6379/"))
356///     .username("cache_user")        // Redis 6+ ACL
357///     .password("secret")
358///     .label("user-sessions")
359///     .key_format(CacheKeyFormat::UrlEncoded)
360///     .value_format(JsonFormat)
361///     .connection_timeout(Duration::from_secs(5))
362///     .response_timeout(Duration::from_secs(2))
363///     .retries(3)
364///     .build()?;
365/// # Ok(())
366/// # }
367/// ```
368///
369/// Cluster connection (requires `cluster` feature):
370///
371/// ```
372/// # #[cfg(feature = "cluster")]
373/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
374/// use hitbox_redis::{RedisBackend, ConnectionMode};
375///
376/// let backend = RedisBackend::builder()
377///     .connection(ConnectionMode::cluster([
378///         "redis://node1:6379",
379///         "redis://node2:6379",
380///         "redis://node3:6379",
381///     ]))
382///     .build()?;
383/// # Ok(())
384/// # }
385/// # #[cfg(not(feature = "cluster"))]
386/// # fn main() {}
387/// ```
388///
389/// # Performance
390///
391/// - **Read operations**: Single pipelined request (`HMGET` + `PTTL`)
392/// - **Write operations**: Single pipelined request (`HSET` + `EXPIRE`)
393/// - **Connection**: Established lazily on first use, multiplexed for concurrent access
394///
395/// # Caveats
396///
397/// - **Connection failure**: First cache operation will fail if Redis is unreachable
398/// - **Expire time approximation**: The `expire` timestamp returned on read is
399///   calculated as `now + PTTL`, which may drift by the network round-trip time
400///
401/// [`Format`]: hitbox_backend::format::Format
402/// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
403/// [`Compressor`]: hitbox_backend::Compressor
404/// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
405#[derive(Clone)]
406pub struct RedisBackend<S = BincodeFormat, C = PassthroughCompressor>
407where
408    S: Format,
409    C: Compressor,
410{
411    /// Connection mode (single node or cluster).
412    mode: ConnectionMode,
413    /// Timeout for establishing connections.
414    connection_timeout: Option<Duration>,
415    /// Timeout for waiting on Redis responses.
416    response_timeout: Option<Duration>,
417    /// Maximum number of retry attempts.
418    number_of_retries: Option<usize>,
419    /// Username for Redis authentication (Redis 6+ ACL).
420    username: Option<String>,
421    /// Password for Redis authentication.
422    password: Option<String>,
423
424    /// Lazy-initialized connection (established on first cache operation).
425    connection: OnceCell<RedisConnection>,
426
427    /// Format used to serialize cache values.
428    serializer: S,
429    /// Format used to serialize cache keys.
430    key_format: CacheKeyFormat,
431    /// Compressor used for cache values.
432    compressor: C,
433    /// Label identifying this backend in multi-tier compositions.
434    label: BackendLabel,
435}
436
437impl RedisBackend<BincodeFormat, PassthroughCompressor> {
438    /// Creates a new builder for `RedisBackend`.
439    ///
440    /// Use the builder to configure the connection mode, serialization format,
441    /// key format, compression, and label. See [`RedisBackend`] for examples.
442    #[must_use]
443    pub fn builder() -> RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
444        RedisBackendBuilder::default()
445    }
446}
447
448impl<S, C> RedisBackend<S, C>
449where
450    S: Format,
451    C: Compressor,
452{
453    /// Gets or initializes the Redis connection lazily.
454    ///
455    /// This method ensures the connection is established only once, even when
456    /// called concurrently from multiple tasks. Subsequent calls return the
457    /// cached connection.
458    async fn get_connection(&self) -> Result<&RedisConnection, Error> {
459        self.connection
460            .get_or_try_init(|| async {
461                match &self.mode {
462                    ConnectionMode::Single(config) => {
463                        // Parse URL and apply authentication if provided
464                        let mut conn_info: redis::ConnectionInfo = config.url.as_str().parse()?;
465                        let mut redis_info = conn_info.redis_settings().clone();
466                        if let Some(ref username) = self.username {
467                            redis_info = redis_info.set_username(username);
468                        }
469                        if let Some(ref password) = self.password {
470                            redis_info = redis_info.set_password(password);
471                        }
472                        conn_info = conn_info.set_redis_settings(redis_info);
473
474                        let client = Client::open(conn_info)?;
475
476                        // Build ConnectionManagerConfig with options
477                        let mut manager_config = redis::aio::ConnectionManagerConfig::new()
478                            .set_exponent_base(config.exponent_base);
479
480                        if let Some(timeout) = self.connection_timeout {
481                            manager_config = manager_config.set_connection_timeout(Some(timeout));
482                        }
483                        if let Some(timeout) = self.response_timeout {
484                            manager_config = manager_config.set_response_timeout(Some(timeout));
485                        }
486                        if let Some(retries) = self.number_of_retries {
487                            manager_config = manager_config.set_number_of_retries(retries);
488                        }
489
490                        let conn = client
491                            .get_connection_manager_with_config(manager_config)
492                            .await?;
493                        Ok(RedisConnection::Single(conn))
494                    }
495                    #[cfg(feature = "cluster")]
496                    ConnectionMode::Cluster(config) => {
497                        let mut builder = redis::cluster::ClusterClientBuilder::new(
498                            config.nodes.iter().map(|s| s.as_str()),
499                        );
500                        if config.read_from_replicas {
501                            builder = builder.read_from_replicas();
502                        }
503                        if let Some(ref username) = self.username {
504                            builder = builder.username(username.clone());
505                        }
506                        if let Some(ref password) = self.password {
507                            builder = builder.password(password.clone());
508                        }
509                        if let Some(timeout) = self.connection_timeout {
510                            builder = builder.connection_timeout(timeout);
511                        }
512                        if let Some(timeout) = self.response_timeout {
513                            builder = builder.response_timeout(timeout);
514                        }
515                        if let Some(retries) = self.number_of_retries {
516                            builder = builder.retries(retries as u32);
517                        }
518
519                        let client = builder.build()?;
520                        let conn = client.get_async_connection().await?;
521                        Ok(RedisConnection::Cluster(conn))
522                    }
523                }
524            })
525            .await
526    }
527}
528
529/// Builder for creating and configuring a [`RedisBackend`].
530///
531/// Use [`RedisBackend::builder`] to create a new builder instance.
532/// See [`RedisBackend`] for usage examples.
533pub struct RedisBackendBuilder<S = BincodeFormat, C = PassthroughCompressor>
534where
535    S: Format,
536    C: Compressor,
537{
538    mode: Option<ConnectionMode>,
539    serializer: S,
540    key_format: CacheKeyFormat,
541    compressor: C,
542    label: BackendLabel,
543    // Common connection options
544    connection_timeout: Option<Duration>,
545    response_timeout: Option<Duration>,
546    number_of_retries: Option<usize>,
547    // Authentication
548    username: Option<String>,
549    password: Option<String>,
550}
551
552impl Default for RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
553    fn default() -> Self {
554        Self {
555            mode: None,
556            serializer: BincodeFormat,
557            key_format: CacheKeyFormat::default(),
558            compressor: PassthroughCompressor,
559            label: BackendLabel::new_static("redis"),
560            connection_timeout: None,
561            response_timeout: None,
562            number_of_retries: None,
563            username: None,
564            password: None,
565        }
566    }
567}
568
569impl<S, C> RedisBackendBuilder<S, C>
570where
571    S: Format,
572    C: Compressor,
573{
574    /// Sets the Redis connection mode.
575    ///
576    /// This is required before calling [`build`].
577    ///
578    /// [`build`]: Self::build
579    pub fn connection(mut self, mode: ConnectionMode) -> Self {
580        self.mode = Some(mode);
581        self
582    }
583
584    /// Sets the connection timeout.
585    ///
586    /// This timeout applies when establishing a new connection to Redis.
587    /// If the connection cannot be established within this duration, the operation fails.
588    ///
589    /// # Default
590    ///
591    /// No timeout (waits indefinitely).
592    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
593        self.connection_timeout = Some(timeout);
594        self
595    }
596
597    /// Sets the response timeout.
598    ///
599    /// This timeout applies when waiting for a response from Redis after sending a command.
600    /// If Redis doesn't respond within this duration, the operation fails.
601    ///
602    /// # Default
603    ///
604    /// No timeout (waits indefinitely).
605    ///
606    /// # Note
607    ///
608    /// If you use blocking commands (like `BLPOP`) or long-running commands,
609    /// ensure the timeout is long enough to accommodate them.
610    pub fn response_timeout(mut self, timeout: Duration) -> Self {
611        self.response_timeout = Some(timeout);
612        self
613    }
614
615    /// Sets the maximum number of connection retry attempts.
616    ///
617    /// When a connection fails, the client will retry up to this many times
618    /// with exponential backoff before giving up.
619    ///
620    /// # Default
621    ///
622    /// Uses the redis-rs default (typically 16 retries for single-node).
623    pub fn retries(mut self, count: usize) -> Self {
624        self.number_of_retries = Some(count);
625        self
626    }
627
628    /// Sets the username for Redis authentication.
629    ///
630    /// Used with Redis 6+ ACL system. For older Redis versions using only
631    /// password authentication, leave this unset.
632    ///
633    /// # Default
634    ///
635    /// None (no username).
636    pub fn username(mut self, username: impl Into<String>) -> Self {
637        self.username = Some(username.into());
638        self
639    }
640
641    /// Sets the password for Redis authentication.
642    ///
643    /// Works with both legacy Redis AUTH and Redis 6+ ACL authentication.
644    /// For ACL authentication, also set [`username`](Self::username).
645    ///
646    /// # Default
647    ///
648    /// None (no password).
649    pub fn password(mut self, password: impl Into<String>) -> Self {
650        self.password = Some(password.into());
651        self
652    }
653
654    /// Sets the cache value serialization format.
655    ///
656    /// The value format determines how cached data is serialized before storage.
657    ///
658    /// # Default
659    ///
660    /// [`BincodeFormat`] (compact binary, recommended for production)
661    ///
662    /// # Options
663    ///
664    /// | Format | Speed | Size | Human-readable |
665    /// |--------|-------|------|----------------|
666    /// | [`BincodeFormat`] | Fast | Compact | No |
667    /// | [`JsonFormat`](hitbox_backend::format::JsonFormat) | Slow | Large | Yes |
668    /// | [`RonFormat`](hitbox_backend::format::RonFormat) | Medium | Medium | Yes |
669    ///
670    /// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
671    pub fn value_format<NewS>(self, serializer: NewS) -> RedisBackendBuilder<NewS, C>
672    where
673        NewS: Format,
674    {
675        RedisBackendBuilder {
676            mode: self.mode,
677            serializer,
678            key_format: self.key_format,
679            compressor: self.compressor,
680            label: self.label,
681            connection_timeout: self.connection_timeout,
682            response_timeout: self.response_timeout,
683            number_of_retries: self.number_of_retries,
684            username: self.username,
685            password: self.password,
686        }
687    }
688
689    /// Sets the cache key serialization format.
690    ///
691    /// The key format determines how [`CacheKey`] values are serialized for
692    /// storage as Redis keys. This affects key size and debuggability.
693    ///
694    /// # Default
695    ///
696    /// [`CacheKeyFormat::Bitcode`]
697    ///
698    /// # Options
699    ///
700    /// | Format | Size | Human-readable |
701    /// |--------|------|----------------|
702    /// | [`Bitcode`](CacheKeyFormat::Bitcode) | Compact | No |
703    /// | [`UrlEncoded`](CacheKeyFormat::UrlEncoded) | Larger | Yes |
704    ///
705    /// [`CacheKey`]: hitbox::CacheKey
706    pub fn key_format(mut self, key_format: CacheKeyFormat) -> Self {
707        self.key_format = key_format;
708        self
709    }
710
711    /// Sets a custom label for this backend.
712    ///
713    /// The label identifies this backend in multi-tier cache compositions and
714    /// appears in metrics and debug output.
715    ///
716    /// # Default
717    ///
718    /// `"redis"`
719    pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
720        self.label = label.into();
721        self
722    }
723
724    /// Sets the compression strategy for cache values.
725    ///
726    /// Compression reduces network bandwidth and Redis memory usage at the cost
727    /// of CPU time. For Redis backends, compression is often beneficial since
728    /// network I/O is typically the bottleneck.
729    ///
730    /// # Default
731    ///
732    /// [`PassthroughCompressor`] (no compression)
733    ///
734    /// # Options
735    ///
736    /// | Compressor | Ratio | Speed | Feature flag |
737    /// |------------|-------|-------|--------------|
738    /// | [`PassthroughCompressor`] | None | Fastest | — |
739    /// | [`GzipCompressor`] | Good | Medium | `gzip` |
740    /// | [`ZstdCompressor`] | Best | Fast | `zstd` |
741    ///
742    /// # When to Use Compression
743    ///
744    /// - Cached values larger than ~1KB
745    /// - High network latency to Redis
746    /// - Redis memory is constrained
747    /// - Using Redis persistence (RDB/AOF)
748    ///
749    /// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
750    /// [`GzipCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.GzipCompressor.html
751    /// [`ZstdCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.ZstdCompressor.html
752    pub fn compressor<NewC>(self, compressor: NewC) -> RedisBackendBuilder<S, NewC>
753    where
754        NewC: Compressor,
755    {
756        RedisBackendBuilder {
757            mode: self.mode,
758            serializer: self.serializer,
759            key_format: self.key_format,
760            compressor,
761            label: self.label,
762            connection_timeout: self.connection_timeout,
763            response_timeout: self.response_timeout,
764            number_of_retries: self.number_of_retries,
765            username: self.username,
766            password: self.password,
767        }
768    }
769
770    /// Builds the [`RedisBackend`] with the configured settings.
771    ///
772    /// This method is synchronous - the actual Redis connection is established
773    /// lazily on first use (get/set/delete operation).
774    ///
775    /// # Errors
776    ///
777    /// Returns [`Error::MissingConnectionMode`] if no connection mode was specified.
778    /// Note: Connection errors will occur on first cache operation, not here.
779    ///
780    /// [`Error::MissingConnectionMode`]: crate::error::Error::MissingConnectionMode
781    pub fn build(self) -> Result<RedisBackend<S, C>, Error> {
782        let mode = self.mode.ok_or(Error::MissingConnectionMode)?;
783
784        Ok(RedisBackend {
785            mode,
786            connection_timeout: self.connection_timeout,
787            response_timeout: self.response_timeout,
788            number_of_retries: self.number_of_retries,
789            username: self.username,
790            password: self.password,
791            connection: OnceCell::new(),
792            serializer: self.serializer,
793            key_format: self.key_format,
794            compressor: self.compressor,
795            label: self.label,
796        })
797    }
798}
799
800#[async_trait]
801impl<S, C> Backend for RedisBackend<S, C>
802where
803    S: Format + Send + Sync,
804    C: Compressor + Send + Sync,
805{
806    async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
807        let mut con = self.get_connection().await?.clone();
808        let cache_key = self.key_format.serialize(key)?;
809
810        // Pipeline: HMGET (data, stale) + PTTL with typed decoding
811        let ((data, stale_ms), pttl): ((Option<Vec<u8>>, Option<i64>), i64) = con
812            .query_pipeline(
813                redis::pipe()
814                    .cmd("HMGET")
815                    .arg(&cache_key)
816                    .arg("d")
817                    .arg("s")
818                    .cmd("PTTL")
819                    .arg(&cache_key),
820            )
821            .await
822            .map_err(Error::from)?;
823
824        // If data is None, key doesn't exist
825        let data = match data {
826            Some(data) => Bytes::from(data),
827            None => return Ok(None),
828        };
829
830        // Convert stale millis to DateTime
831        let stale = stale_ms.and_then(DateTime::from_timestamp_millis);
832
833        // Calculate expire from PTTL (milliseconds remaining)
834        // PTTL returns: -2 if key doesn't exist, -1 if no TTL, else milliseconds
835        let expire = (pttl > 0).then(|| Utc::now() + chrono::Duration::milliseconds(pttl));
836
837        Ok(Some(CacheValue::new(data, expire, stale)))
838    }
839
840    async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
841        let mut con = self.get_connection().await?.clone();
842        let cache_key = self.key_format.serialize(key)?;
843
844        // Build HSET command with data field, optionally add stale field
845        let mut cmd = redis::cmd("HSET");
846        cmd.arg(&cache_key).arg("d").arg(value.data().as_ref());
847        if let Some(stale) = value.stale() {
848            cmd.arg("s").arg(stale.timestamp_millis());
849        }
850
851        // Pipeline: HSET + optional EXPIRE (computed from value.ttl())
852        let mut pipe = redis::pipe();
853        pipe.add_command(cmd).ignore();
854        if let Some(ttl_duration) = value.ttl() {
855            pipe.cmd("EXPIRE")
856                .arg(&cache_key)
857                .arg(ttl_duration.as_secs())
858                .ignore();
859        }
860
861        con.query_pipeline::<()>(&pipe).await.map_err(Error::from)?;
862        Ok(())
863    }
864
865    async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
866        let mut con = self.get_connection().await?.clone();
867        let cache_key = self.key_format.serialize(key)?;
868
869        let deleted: i32 = con
870            .query_cmd(redis::cmd("DEL").arg(cache_key))
871            .await
872            .map_err(Error::from)?;
873
874        if deleted > 0 {
875            Ok(DeleteStatus::Deleted(deleted as u32))
876        } else {
877            Ok(DeleteStatus::Missing)
878        }
879    }
880
881    fn label(&self) -> BackendLabel {
882        self.label.clone()
883    }
884
885    fn value_format(&self) -> &dyn Format {
886        &self.serializer
887    }
888
889    fn key_format(&self) -> &CacheKeyFormat {
890        &self.key_format
891    }
892
893    fn compressor(&self) -> &dyn Compressor {
894        &self.compressor
895    }
896}
897
898// Explicit CacheBackend implementation using default trait methods
899impl<S, C> hitbox_backend::CacheBackend for RedisBackend<S, C>
900where
901    S: Format + Send + Sync,
902    C: Compressor + Send + Sync,
903{
904}