Skip to main content

hitbox_redis/
backend.rs

1//! Redis backend implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use hitbox::{BackendLabel, CacheKey, CacheValue, Raw};
9use hitbox_backend::{
10    Backend, BackendError, BackendResult, CacheKeyFormat, Compressor, DeleteStatus,
11    PassthroughCompressor,
12    format::{BincodeFormat, Format},
13};
14use redis::Client;
15use redis::aio::ConnectionManager;
16#[cfg(feature = "cluster")]
17use redis::cluster_async::ClusterConnection;
18use tokio::sync::OnceCell;
19
20use crate::error::Error;
21
22/// Configuration for a single Redis node connection.
23///
24/// # When You'll Encounter This
25///
26/// You typically don't create this directly. It appears when:
27/// - Using [`ConnectionMode::single`] which creates this internally
28/// - Accessing configuration for debugging or logging
29///
30/// # Examples
31///
32/// ```
33/// use hitbox_redis::SingleConfig;
34///
35/// let config = SingleConfig::new("redis://localhost:6379/");
36/// ```
37#[derive(Debug, Clone)]
38pub struct SingleConfig {
39    /// Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`.
40    pub(crate) url: String,
41    /// Exponential backoff base for reconnection attempts. Default: `2.0`.
42    pub(crate) exponent_base: f32,
43}
44
45impl SingleConfig {
46    /// Creates a new single-node configuration.
47    ///
48    /// # Arguments
49    ///
50    /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
51    ///
52    /// # Default
53    ///
54    /// * `exponent_base`: `2.0` (exponential backoff base for retries)
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// use hitbox_redis::SingleConfig;
60    ///
61    /// let config = SingleConfig::new("redis://localhost:6379/0");
62    /// ```
63    pub fn new(url: impl Into<String>) -> Self {
64        Self {
65            url: url.into(),
66            exponent_base: 2.0,
67        }
68    }
69}
70
71/// Configuration for a Redis Cluster connection.
72///
73/// # When You'll Encounter This
74///
75/// You typically don't create this directly. It appears when:
76/// - Using [`ConnectionMode::cluster`] which creates this internally
77/// - Accessing configuration for debugging or logging
78///
79/// # Examples
80///
81/// ```
82/// # #[cfg(feature = "cluster")]
83/// # fn main() {
84/// use hitbox_redis::ClusterConfig;
85///
86/// let config = ClusterConfig::new([
87///     "redis://node1:6379",
88///     "redis://node2:6379",
89///     "redis://node3:6379",
90/// ]);
91/// # }
92/// # #[cfg(not(feature = "cluster"))]
93/// # fn main() {}
94/// ```
95#[cfg(feature = "cluster")]
96#[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
97#[derive(Debug, Clone)]
98pub struct ClusterConfig {
99    /// List of initial cluster node URLs. The client discovers other nodes automatically.
100    pub(crate) nodes: Vec<String>,
101    /// Whether to allow reading from replica nodes. Default: `false`.
102    pub(crate) read_from_replicas: bool,
103}
104
105#[cfg(feature = "cluster")]
106impl ClusterConfig {
107    /// Creates a new cluster configuration.
108    ///
109    /// # Arguments
110    ///
111    /// * `nodes` - List of initial cluster node URLs. The client discovers
112    ///   other nodes automatically via the `CLUSTER SLOTS` command.
113    ///
114    /// # Default
115    ///
116    /// * `read_from_replicas`: `false`
117    ///
118    /// # Examples
119    ///
120    /// ```
121    /// # #[cfg(feature = "cluster")]
122    /// # fn main() {
123    /// use hitbox_redis::ClusterConfig;
124    ///
125    /// let config = ClusterConfig::new([
126    ///     "redis://node1:6379",
127    ///     "redis://node2:6379",
128    ///     "redis://node3:6379",
129    /// ]);
130    /// # }
131    /// # #[cfg(not(feature = "cluster"))]
132    /// # fn main() {}
133    /// ```
134    pub fn new<I, S>(nodes: I) -> Self
135    where
136        I: IntoIterator<Item = S>,
137        S: Into<String>,
138    {
139        Self {
140            nodes: nodes.into_iter().map(Into::into).collect(),
141            read_from_replicas: false,
142        }
143    }
144}
145
146/// Redis connection mode.
147///
148/// Determines whether to connect to a single Redis instance or a Redis Cluster.
149///
150/// # Examples
151///
152/// Single-node connection:
153/// ```
154/// use hitbox_redis::ConnectionMode;
155///
156/// let mode = ConnectionMode::single("redis://localhost:6379/");
157/// ```
158///
159/// Cluster connection (requires `cluster` feature):
160///
161/// ```
162/// # #[cfg(feature = "cluster")]
163/// # fn main() {
164/// use hitbox_redis::ConnectionMode;
165///
166/// let mode = ConnectionMode::cluster([
167///     "redis://node1:6379",
168///     "redis://node2:6379",
169///     "redis://node3:6379",
170/// ]);
171/// # }
172/// # #[cfg(not(feature = "cluster"))]
173/// # fn main() {}
174/// ```
175#[derive(Debug, Clone)]
176pub enum ConnectionMode {
177    /// Single Redis node connection.
178    Single(SingleConfig),
179
180    /// Redis Cluster connection.
181    #[cfg(feature = "cluster")]
182    #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
183    Cluster(ClusterConfig),
184}
185
186impl ConnectionMode {
187    /// Create a single-node connection mode.
188    ///
189    /// # Arguments
190    ///
191    /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
192    pub fn single(url: impl Into<String>) -> Self {
193        Self::Single(SingleConfig::new(url))
194    }
195
196    /// Create a cluster connection mode.
197    ///
198    /// # Arguments
199    ///
200    /// * `nodes` - List of initial cluster node URLs. The client will discover other nodes automatically.
201    #[cfg(feature = "cluster")]
202    #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
203    pub fn cluster<I, S>(nodes: I) -> Self
204    where
205        I: IntoIterator<Item = S>,
206        S: Into<String>,
207    {
208        Self::Cluster(ClusterConfig::new(nodes))
209    }
210
211    /// Sets the exponential backoff base for retries (single-node only).
212    ///
213    /// The delay between reconnection attempts is calculated as `base^attempt` milliseconds.
214    ///
215    /// # Default
216    ///
217    /// `2.0`
218    ///
219    /// # Caveats
220    ///
221    /// This option only applies to single-node connections and is silently ignored
222    /// for cluster mode.
223    ///
224    /// # Examples
225    ///
226    /// ```
227    /// use hitbox_redis::ConnectionMode;
228    ///
229    /// // Use a slower backoff (3^attempt ms)
230    /// let mode = ConnectionMode::single("redis://localhost:6379/")
231    ///     .exponent_base(3.0);
232    /// ```
233    #[allow(irrefutable_let_patterns)]
234    pub fn exponent_base(mut self, base: f32) -> Self {
235        if let Self::Single(ref mut config) = self {
236            config.exponent_base = base;
237        }
238        self
239    }
240
241    /// Enables reading from replica nodes (cluster only).
242    ///
243    /// When enabled, read operations may be served by replica nodes for better
244    /// read throughput and reduced load on primary nodes.
245    ///
246    /// # Default
247    ///
248    /// Disabled (reads only from primary nodes).
249    ///
250    /// # Caveats
251    ///
252    /// - Replicas may have slightly stale data due to replication lag
253    /// - This option only applies to cluster connections and is silently ignored
254    ///   for single-node mode
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// # #[cfg(feature = "cluster")]
260    /// # fn main() {
261    /// use hitbox_redis::ConnectionMode;
262    ///
263    /// let mode = ConnectionMode::cluster([
264    ///     "redis://node1:6379",
265    ///     "redis://node2:6379",
266    /// ])
267    /// .read_from_replicas();
268    /// # }
269    /// # #[cfg(not(feature = "cluster"))]
270    /// # fn main() {}
271    /// ```
272    #[cfg(feature = "cluster")]
273    #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
274    pub fn read_from_replicas(mut self) -> Self {
275        if let Self::Cluster(ref mut config) = self {
276            config.read_from_replicas = true;
277        }
278        self
279    }
280}
281
282/// Internal wrapper for Redis connection types.
283#[derive(Clone)]
284enum RedisConnection {
285    Single(ConnectionManager),
286    #[cfg(feature = "cluster")]
287    Cluster(ClusterConnection),
288}
289
290impl RedisConnection {
291    /// Execute a pipeline and return the result.
292    async fn query_pipeline<T: redis::FromRedisValue>(
293        &mut self,
294        pipe: &redis::Pipeline,
295    ) -> Result<T, redis::RedisError> {
296        match self {
297            Self::Single(conn) => pipe.query_async(conn).await,
298            #[cfg(feature = "cluster")]
299            Self::Cluster(conn) => pipe.query_async(conn).await,
300        }
301    }
302
303    /// Execute a single command.
304    async fn query_cmd<T: redis::FromRedisValue>(
305        &mut self,
306        cmd: &mut redis::Cmd,
307    ) -> Result<T, redis::RedisError> {
308        match self {
309            Self::Single(conn) => cmd.query_async(conn).await,
310            #[cfg(feature = "cluster")]
311            Self::Cluster(conn) => cmd.query_async(conn).await,
312        }
313    }
314}
315
316/// Redis cache backend for single-node or cluster deployments.
317///
318/// `RedisBackend` provides a cache backend using Redis as the storage layer.
319/// It supports both single-node Redis instances and Redis Cluster
320/// (with the `cluster` feature enabled).
321///
322/// Use [`RedisBackendBuilder`] to construct this type.
323///
324/// # Type Parameters
325///
326/// * `S` - Serialization format for cache values. Implements [`Format`].
327///   Default: [`BincodeFormat`] (compact binary, recommended for production).
328/// * `C` - Compression strategy for cache values. Implements [`Compressor`].
329///   Default: [`PassthroughCompressor`] (no compression).
330///
331/// # Examples
332///
333/// Basic single-node connection:
334///
335/// ```
336/// use hitbox_redis::{RedisBackend, ConnectionMode};
337///
338/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
339/// let backend = RedisBackend::builder()
340///     .connection(ConnectionMode::single("redis://localhost:6379/"))
341///     .build()?;
342/// # Ok(())
343/// # }
344/// ```
345///
346/// With all configuration options:
347///
348/// ```
349/// use std::time::Duration;
350/// use hitbox_redis::{RedisBackend, ConnectionMode};
351/// use hitbox_backend::CacheKeyFormat;
352/// use hitbox_backend::format::JsonFormat;
353///
354/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
355/// let backend = RedisBackend::builder()
356///     .connection(ConnectionMode::single("redis://localhost:6379/"))
357///     .username("cache_user")        // Redis 6+ ACL
358///     .password("secret")
359///     .label("user-sessions")
360///     .key_format(CacheKeyFormat::UrlEncoded)
361///     .value_format(JsonFormat)
362///     .connection_timeout(Duration::from_secs(5))
363///     .response_timeout(Duration::from_secs(2))
364///     .retries(3)
365///     .build()?;
366/// # Ok(())
367/// # }
368/// ```
369///
370/// Cluster connection (requires `cluster` feature):
371///
372/// ```
373/// # #[cfg(feature = "cluster")]
374/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
375/// use hitbox_redis::{RedisBackend, ConnectionMode};
376///
377/// let backend = RedisBackend::builder()
378///     .connection(ConnectionMode::cluster([
379///         "redis://node1:6379",
380///         "redis://node2:6379",
381///         "redis://node3:6379",
382///     ]))
383///     .build()?;
384/// # Ok(())
385/// # }
386/// # #[cfg(not(feature = "cluster"))]
387/// # fn main() {}
388/// ```
389///
390/// # Performance
391///
392/// - **Read operations**: Single pipelined request (`HMGET` + `PTTL`)
393/// - **Write operations**: Single pipelined request (`HSET` + `PEXPIRE`)
394/// - **Connection**: Established lazily on first use, multiplexed for concurrent access
395///
396/// # Caveats
397///
398/// - **Connection failure**: First cache operation will fail if Redis is unreachable
399/// - **Expire time approximation**: The `expire` timestamp returned on read is
400///   calculated as `now + PTTL`, which may drift by the network round-trip time
401///
402/// [`Format`]: hitbox_backend::format::Format
403/// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
404/// [`Compressor`]: hitbox_backend::Compressor
405/// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
406#[derive(Clone)]
407pub struct RedisBackend<S = BincodeFormat, C = PassthroughCompressor>
408where
409    S: Format,
410    C: Compressor,
411{
412    /// Connection mode (single node or cluster).
413    mode: ConnectionMode,
414    /// Timeout for establishing connections.
415    connection_timeout: Option<Duration>,
416    /// Timeout for waiting on Redis responses.
417    response_timeout: Option<Duration>,
418    /// Maximum number of retry attempts.
419    number_of_retries: Option<usize>,
420    /// Username for Redis authentication (Redis 6+ ACL).
421    username: Option<String>,
422    /// Password for Redis authentication.
423    password: Option<String>,
424
425    /// Lazy-initialized connection (established on first cache operation).
426    connection: OnceCell<RedisConnection>,
427
428    /// Format used to serialize cache values.
429    serializer: S,
430    /// Format used to serialize cache keys.
431    key_format: CacheKeyFormat,
432    /// Compressor used for cache values.
433    compressor: C,
434    /// Label identifying this backend in multi-tier compositions.
435    label: BackendLabel,
436}
437
438impl RedisBackend<BincodeFormat, PassthroughCompressor> {
439    /// Creates a new builder for `RedisBackend`.
440    ///
441    /// Use the builder to configure the connection mode, serialization format,
442    /// key format, compression, and label. See [`RedisBackend`] for examples.
443    #[must_use]
444    pub fn builder() -> RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
445        RedisBackendBuilder::default()
446    }
447}
448
449impl<S, C> RedisBackend<S, C>
450where
451    S: Format,
452    C: Compressor,
453{
454    /// Gets or initializes the Redis connection lazily.
455    ///
456    /// This method ensures the connection is established only once, even when
457    /// called concurrently from multiple tasks. Subsequent calls return the
458    /// cached connection.
459    async fn get_connection(&self) -> Result<&RedisConnection, Error> {
460        self.connection
461            .get_or_try_init(|| async {
462                match &self.mode {
463                    ConnectionMode::Single(config) => {
464                        // Parse URL and apply authentication if provided
465                        let mut conn_info: redis::ConnectionInfo = config.url.as_str().parse()?;
466                        let mut redis_info = conn_info.redis_settings().clone();
467                        if let Some(ref username) = self.username {
468                            redis_info = redis_info.set_username(username);
469                        }
470                        if let Some(ref password) = self.password {
471                            redis_info = redis_info.set_password(password);
472                        }
473                        conn_info = conn_info.set_redis_settings(redis_info);
474
475                        let client = Client::open(conn_info)?;
476
477                        // Build ConnectionManagerConfig with options
478                        let mut manager_config = redis::aio::ConnectionManagerConfig::new()
479                            .set_exponent_base(config.exponent_base);
480
481                        if let Some(timeout) = self.connection_timeout {
482                            manager_config = manager_config.set_connection_timeout(Some(timeout));
483                        }
484                        if let Some(timeout) = self.response_timeout {
485                            manager_config = manager_config.set_response_timeout(Some(timeout));
486                        }
487                        if let Some(retries) = self.number_of_retries {
488                            manager_config = manager_config.set_number_of_retries(retries);
489                        }
490
491                        let conn = client
492                            .get_connection_manager_with_config(manager_config)
493                            .await?;
494                        Ok(RedisConnection::Single(conn))
495                    }
496                    #[cfg(feature = "cluster")]
497                    ConnectionMode::Cluster(config) => {
498                        let mut builder = redis::cluster::ClusterClientBuilder::new(
499                            config.nodes.iter().map(|s| s.as_str()),
500                        );
501                        if config.read_from_replicas {
502                            builder = builder.read_from_replicas();
503                        }
504                        if let Some(ref username) = self.username {
505                            builder = builder.username(username.clone());
506                        }
507                        if let Some(ref password) = self.password {
508                            builder = builder.password(password.clone());
509                        }
510                        if let Some(timeout) = self.connection_timeout {
511                            builder = builder.connection_timeout(timeout);
512                        }
513                        if let Some(timeout) = self.response_timeout {
514                            builder = builder.response_timeout(timeout);
515                        }
516                        if let Some(retries) = self.number_of_retries {
517                            builder = builder.retries(retries as u32);
518                        }
519
520                        let client = builder.build()?;
521                        let conn = client.get_async_connection().await?;
522                        Ok(RedisConnection::Cluster(conn))
523                    }
524                }
525            })
526            .await
527    }
528}
529
530/// Builder for creating and configuring a [`RedisBackend`].
531///
532/// Use [`RedisBackend::builder`] to create a new builder instance.
533/// See [`RedisBackend`] for usage examples.
534pub struct RedisBackendBuilder<S = BincodeFormat, C = PassthroughCompressor>
535where
536    S: Format,
537    C: Compressor,
538{
539    mode: Option<ConnectionMode>,
540    serializer: S,
541    key_format: CacheKeyFormat,
542    compressor: C,
543    label: BackendLabel,
544    // Common connection options
545    connection_timeout: Option<Duration>,
546    response_timeout: Option<Duration>,
547    number_of_retries: Option<usize>,
548    // Authentication
549    username: Option<String>,
550    password: Option<String>,
551}
552
553impl Default for RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
554    fn default() -> Self {
555        Self {
556            mode: None,
557            serializer: BincodeFormat,
558            key_format: CacheKeyFormat::default(),
559            compressor: PassthroughCompressor,
560            label: BackendLabel::new_static("redis"),
561            connection_timeout: None,
562            response_timeout: None,
563            number_of_retries: None,
564            username: None,
565            password: None,
566        }
567    }
568}
569
570impl<S, C> RedisBackendBuilder<S, C>
571where
572    S: Format,
573    C: Compressor,
574{
575    /// Sets the Redis connection mode.
576    ///
577    /// This is required before calling [`build`].
578    ///
579    /// [`build`]: Self::build
580    pub fn connection(mut self, mode: ConnectionMode) -> Self {
581        self.mode = Some(mode);
582        self
583    }
584
585    /// Sets the connection timeout.
586    ///
587    /// This timeout applies when establishing a new connection to Redis.
588    /// If the connection cannot be established within this duration, the operation fails.
589    ///
590    /// # Default
591    ///
592    /// No timeout (waits indefinitely).
593    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
594        self.connection_timeout = Some(timeout);
595        self
596    }
597
598    /// Sets the response timeout.
599    ///
600    /// This timeout applies when waiting for a response from Redis after sending a command.
601    /// If Redis doesn't respond within this duration, the operation fails.
602    ///
603    /// # Default
604    ///
605    /// No timeout (waits indefinitely).
606    ///
607    /// # Note
608    ///
609    /// If you use blocking commands (like `BLPOP`) or long-running commands,
610    /// ensure the timeout is long enough to accommodate them.
611    pub fn response_timeout(mut self, timeout: Duration) -> Self {
612        self.response_timeout = Some(timeout);
613        self
614    }
615
616    /// Sets the maximum number of connection retry attempts.
617    ///
618    /// When a connection fails, the client will retry up to this many times
619    /// with exponential backoff before giving up.
620    ///
621    /// # Default
622    ///
623    /// Uses the redis-rs default (typically 16 retries for single-node).
624    pub fn retries(mut self, count: usize) -> Self {
625        self.number_of_retries = Some(count);
626        self
627    }
628
629    /// Sets the username for Redis authentication.
630    ///
631    /// Used with Redis 6+ ACL system. For older Redis versions using only
632    /// password authentication, leave this unset.
633    ///
634    /// # Default
635    ///
636    /// None (no username).
637    pub fn username(mut self, username: impl Into<String>) -> Self {
638        self.username = Some(username.into());
639        self
640    }
641
642    /// Sets the password for Redis authentication.
643    ///
644    /// Works with both legacy Redis AUTH and Redis 6+ ACL authentication.
645    /// For ACL authentication, also set [`username`](Self::username).
646    ///
647    /// # Default
648    ///
649    /// None (no password).
650    pub fn password(mut self, password: impl Into<String>) -> Self {
651        self.password = Some(password.into());
652        self
653    }
654
655    /// Sets the cache value serialization format.
656    ///
657    /// The value format determines how cached data is serialized before storage.
658    ///
659    /// # Default
660    ///
661    /// [`BincodeFormat`] (compact binary, recommended for production)
662    ///
663    /// # Options
664    ///
665    /// | Format | Speed | Size | Human-readable |
666    /// |--------|-------|------|----------------|
667    /// | [`BincodeFormat`] | Fast | Compact | No |
668    /// | [`JsonFormat`](hitbox_backend::format::JsonFormat) | Slow | Large | Yes |
669    /// | [`RonFormat`](hitbox_backend::format::RonFormat) | Medium | Medium | Yes |
670    ///
671    /// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
672    pub fn value_format<NewS>(self, serializer: NewS) -> RedisBackendBuilder<NewS, C>
673    where
674        NewS: Format,
675    {
676        RedisBackendBuilder {
677            mode: self.mode,
678            serializer,
679            key_format: self.key_format,
680            compressor: self.compressor,
681            label: self.label,
682            connection_timeout: self.connection_timeout,
683            response_timeout: self.response_timeout,
684            number_of_retries: self.number_of_retries,
685            username: self.username,
686            password: self.password,
687        }
688    }
689
690    /// Sets the cache key serialization format.
691    ///
692    /// The key format determines how [`CacheKey`] values are serialized for
693    /// storage as Redis keys. This affects key size and debuggability.
694    ///
695    /// # Default
696    ///
697    /// [`CacheKeyFormat::Bitcode`]
698    ///
699    /// # Options
700    ///
701    /// | Format | Size | Human-readable |
702    /// |--------|------|----------------|
703    /// | [`Bitcode`](CacheKeyFormat::Bitcode) | Compact | No |
704    /// | [`UrlEncoded`](CacheKeyFormat::UrlEncoded) | Larger | Yes |
705    ///
706    /// [`CacheKey`]: hitbox::CacheKey
707    pub fn key_format(mut self, key_format: CacheKeyFormat) -> Self {
708        self.key_format = key_format;
709        self
710    }
711
712    /// Sets a custom label for this backend.
713    ///
714    /// The label identifies this backend in multi-tier cache compositions and
715    /// appears in metrics and debug output.
716    ///
717    /// # Default
718    ///
719    /// `"redis"`
720    pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
721        self.label = label.into();
722        self
723    }
724
725    /// Sets the compression strategy for cache values.
726    ///
727    /// Compression reduces network bandwidth and Redis memory usage at the cost
728    /// of CPU time. For Redis backends, compression is often beneficial since
729    /// network I/O is typically the bottleneck.
730    ///
731    /// # Default
732    ///
733    /// [`PassthroughCompressor`] (no compression)
734    ///
735    /// # Options
736    ///
737    /// | Compressor | Ratio | Speed | Feature flag |
738    /// |------------|-------|-------|--------------|
739    /// | [`PassthroughCompressor`] | None | Fastest | — |
740    /// | [`GzipCompressor`] | Good | Medium | `gzip` |
741    /// | [`ZstdCompressor`] | Best | Fast | `zstd` |
742    ///
743    /// # When to Use Compression
744    ///
745    /// - Cached values larger than ~1KB
746    /// - High network latency to Redis
747    /// - Redis memory is constrained
748    /// - Using Redis persistence (RDB/AOF)
749    ///
750    /// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
751    /// [`GzipCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.GzipCompressor.html
752    /// [`ZstdCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.ZstdCompressor.html
753    pub fn compressor<NewC>(self, compressor: NewC) -> RedisBackendBuilder<S, NewC>
754    where
755        NewC: Compressor,
756    {
757        RedisBackendBuilder {
758            mode: self.mode,
759            serializer: self.serializer,
760            key_format: self.key_format,
761            compressor,
762            label: self.label,
763            connection_timeout: self.connection_timeout,
764            response_timeout: self.response_timeout,
765            number_of_retries: self.number_of_retries,
766            username: self.username,
767            password: self.password,
768        }
769    }
770
771    /// Builds the [`RedisBackend`] with the configured settings.
772    ///
773    /// This method is synchronous - the actual Redis connection is established
774    /// lazily on first use (get/set/delete operation).
775    ///
776    /// # Errors
777    ///
778    /// Returns [`Error::MissingConnectionMode`] if no connection mode was specified.
779    /// Note: Connection errors will occur on first cache operation, not here.
780    ///
781    /// [`Error::MissingConnectionMode`]: crate::error::Error::MissingConnectionMode
782    pub fn build(self) -> Result<RedisBackend<S, C>, Error> {
783        let mode = self.mode.ok_or(Error::MissingConnectionMode)?;
784
785        Ok(RedisBackend {
786            mode,
787            connection_timeout: self.connection_timeout,
788            response_timeout: self.response_timeout,
789            number_of_retries: self.number_of_retries,
790            username: self.username,
791            password: self.password,
792            connection: OnceCell::new(),
793            serializer: self.serializer,
794            key_format: self.key_format,
795            compressor: self.compressor,
796            label: self.label,
797        })
798    }
799}
800
801#[async_trait]
802impl<S, C> Backend for RedisBackend<S, C>
803where
804    S: Format + Send + Sync,
805    C: Compressor + Send + Sync,
806{
807    async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
808        let mut con = self.get_connection().await?.clone();
809        let cache_key = self.key_format.serialize(key)?;
810
811        // Pipeline: HMGET (data, stale) + PTTL with typed decoding
812        let ((data, stale_ms), pttl): ((Option<Vec<u8>>, Option<i64>), i64) = con
813            .query_pipeline(
814                redis::pipe()
815                    .cmd("HMGET")
816                    .arg(&cache_key)
817                    .arg("d")
818                    .arg("s")
819                    .cmd("PTTL")
820                    .arg(&cache_key),
821            )
822            .await
823            .map_err(Error::from)?;
824
825        // If data is None, key doesn't exist
826        let data = match data {
827            Some(data) => Bytes::from(data),
828            None => return Ok(None),
829        };
830
831        // Convert stale millis to DateTime
832        let stale = stale_ms.and_then(DateTime::from_timestamp_millis);
833
834        // Calculate expire from PTTL (milliseconds remaining)
835        // PTTL returns: -2 if key doesn't exist, -1 if no TTL, else milliseconds
836        let expire = (pttl > 0).then(|| Utc::now() + chrono::Duration::milliseconds(pttl));
837
838        Ok(Some(CacheValue::new(data, expire, stale)))
839    }
840
841    async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
842        let mut con = self.get_connection().await?.clone();
843        let cache_key = self.key_format.serialize(key)?;
844
845        // Build HSET command with data field, optionally add stale field
846        let mut cmd = redis::cmd("HSET");
847        cmd.arg(&cache_key).arg("d").arg(value.data().as_ref());
848        if let Some(stale) = value.stale() {
849            cmd.arg("s").arg(stale.timestamp_millis());
850        }
851
852        // Pipeline: HSET + optional PEXPIRE (computed from value.ttl())
853        let mut pipe = redis::pipe();
854        pipe.add_command(cmd).ignore();
855        if let Some(ttl_duration) = value.ttl() {
856            pipe.cmd("PEXPIRE")
857                .arg(&cache_key)
858                .arg(u64::try_from(ttl_duration.as_millis()).map_err(|_| {
859                    BackendError::InternalError(Box::new(std::io::Error::new(
860                        std::io::ErrorKind::InvalidInput,
861                        format!(
862                            "TTL overflow: {}ms exceeds u64 range",
863                            ttl_duration.as_millis()
864                        ),
865                    )))
866                })?)
867                .ignore();
868        }
869
870        con.query_pipeline::<()>(&pipe).await.map_err(Error::from)?;
871        Ok(())
872    }
873
874    async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
875        let mut con = self.get_connection().await?.clone();
876        let cache_key = self.key_format.serialize(key)?;
877
878        let deleted: i32 = con
879            .query_cmd(redis::cmd("DEL").arg(cache_key))
880            .await
881            .map_err(Error::from)?;
882
883        if deleted > 0 {
884            Ok(DeleteStatus::Deleted(deleted as u32))
885        } else {
886            Ok(DeleteStatus::Missing)
887        }
888    }
889
890    fn label(&self) -> BackendLabel {
891        self.label.clone()
892    }
893
894    fn value_format(&self) -> &dyn Format {
895        &self.serializer
896    }
897
898    fn key_format(&self) -> &CacheKeyFormat {
899        &self.key_format
900    }
901
902    fn compressor(&self) -> &dyn Compressor {
903        &self.compressor
904    }
905}
906
907// Explicit CacheBackend implementation using default trait methods
908impl<S, C> hitbox_backend::CacheBackend for RedisBackend<S, C>
909where
910    S: Format + Send + Sync,
911    C: Compressor + Send + Sync,
912{
913}