hitbox_redis/backend.rs
1//! Redis backend implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use hitbox::{BackendLabel, CacheKey, CacheValue, Raw};
9use hitbox_backend::{
10 Backend, BackendError, BackendResult, CacheKeyFormat, Compressor, DeleteStatus,
11 PassthroughCompressor,
12 format::{BincodeFormat, Format},
13};
14use redis::Client;
15use redis::aio::ConnectionManager;
16#[cfg(feature = "cluster")]
17use redis::cluster_async::ClusterConnection;
18use tokio::sync::OnceCell;
19
20use crate::error::Error;
21
22/// Configuration for a single Redis node connection.
23///
24/// # When You'll Encounter This
25///
26/// You typically don't create this directly. It appears when:
27/// - Using [`ConnectionMode::single`] which creates this internally
28/// - Accessing configuration for debugging or logging
29///
30/// # Examples
31///
32/// ```
33/// use hitbox_redis::SingleConfig;
34///
35/// let config = SingleConfig::new("redis://localhost:6379/");
36/// ```
37#[derive(Debug, Clone)]
38pub struct SingleConfig {
39 /// Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`.
40 pub(crate) url: String,
41 /// Exponential backoff base for reconnection attempts. Default: `2.0`.
42 pub(crate) exponent_base: f32,
43}
44
45impl SingleConfig {
46 /// Creates a new single-node configuration.
47 ///
48 /// # Arguments
49 ///
50 /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
51 ///
52 /// # Default
53 ///
54 /// * `exponent_base`: `2.0` (exponential backoff base for retries)
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// use hitbox_redis::SingleConfig;
60 ///
61 /// let config = SingleConfig::new("redis://localhost:6379/0");
62 /// ```
63 pub fn new(url: impl Into<String>) -> Self {
64 Self {
65 url: url.into(),
66 exponent_base: 2.0,
67 }
68 }
69}
70
71/// Configuration for a Redis Cluster connection.
72///
73/// # When You'll Encounter This
74///
75/// You typically don't create this directly. It appears when:
76/// - Using [`ConnectionMode::cluster`] which creates this internally
77/// - Accessing configuration for debugging or logging
78///
79/// # Examples
80///
81/// ```
82/// # #[cfg(feature = "cluster")]
83/// # fn main() {
84/// use hitbox_redis::ClusterConfig;
85///
86/// let config = ClusterConfig::new([
87/// "redis://node1:6379",
88/// "redis://node2:6379",
89/// "redis://node3:6379",
90/// ]);
91/// # }
92/// # #[cfg(not(feature = "cluster"))]
93/// # fn main() {}
94/// ```
95#[cfg(feature = "cluster")]
96#[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
97#[derive(Debug, Clone)]
98pub struct ClusterConfig {
99 /// List of initial cluster node URLs. The client discovers other nodes automatically.
100 pub(crate) nodes: Vec<String>,
101 /// Whether to allow reading from replica nodes. Default: `false`.
102 pub(crate) read_from_replicas: bool,
103}
104
105#[cfg(feature = "cluster")]
106impl ClusterConfig {
107 /// Creates a new cluster configuration.
108 ///
109 /// # Arguments
110 ///
111 /// * `nodes` - List of initial cluster node URLs. The client discovers
112 /// other nodes automatically via the `CLUSTER SLOTS` command.
113 ///
114 /// # Default
115 ///
116 /// * `read_from_replicas`: `false`
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// # #[cfg(feature = "cluster")]
122 /// # fn main() {
123 /// use hitbox_redis::ClusterConfig;
124 ///
125 /// let config = ClusterConfig::new([
126 /// "redis://node1:6379",
127 /// "redis://node2:6379",
128 /// "redis://node3:6379",
129 /// ]);
130 /// # }
131 /// # #[cfg(not(feature = "cluster"))]
132 /// # fn main() {}
133 /// ```
134 pub fn new<I, S>(nodes: I) -> Self
135 where
136 I: IntoIterator<Item = S>,
137 S: Into<String>,
138 {
139 Self {
140 nodes: nodes.into_iter().map(Into::into).collect(),
141 read_from_replicas: false,
142 }
143 }
144}
145
146/// Redis connection mode.
147///
148/// Determines whether to connect to a single Redis instance or a Redis Cluster.
149///
150/// # Examples
151///
152/// Single-node connection:
153/// ```
154/// use hitbox_redis::ConnectionMode;
155///
156/// let mode = ConnectionMode::single("redis://localhost:6379/");
157/// ```
158///
159/// Cluster connection (requires `cluster` feature):
160///
161/// ```
162/// # #[cfg(feature = "cluster")]
163/// # fn main() {
164/// use hitbox_redis::ConnectionMode;
165///
166/// let mode = ConnectionMode::cluster([
167/// "redis://node1:6379",
168/// "redis://node2:6379",
169/// "redis://node3:6379",
170/// ]);
171/// # }
172/// # #[cfg(not(feature = "cluster"))]
173/// # fn main() {}
174/// ```
175#[derive(Debug, Clone)]
176pub enum ConnectionMode {
177 /// Single Redis node connection.
178 Single(SingleConfig),
179
180 /// Redis Cluster connection.
181 #[cfg(feature = "cluster")]
182 #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
183 Cluster(ClusterConfig),
184}
185
186impl ConnectionMode {
187 /// Create a single-node connection mode.
188 ///
189 /// # Arguments
190 ///
191 /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
192 pub fn single(url: impl Into<String>) -> Self {
193 Self::Single(SingleConfig::new(url))
194 }
195
196 /// Create a cluster connection mode.
197 ///
198 /// # Arguments
199 ///
200 /// * `nodes` - List of initial cluster node URLs. The client will discover other nodes automatically.
201 #[cfg(feature = "cluster")]
202 #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
203 pub fn cluster<I, S>(nodes: I) -> Self
204 where
205 I: IntoIterator<Item = S>,
206 S: Into<String>,
207 {
208 Self::Cluster(ClusterConfig::new(nodes))
209 }
210
211 /// Sets the exponential backoff base for retries (single-node only).
212 ///
213 /// The delay between reconnection attempts is calculated as `base^attempt` milliseconds.
214 ///
215 /// # Default
216 ///
217 /// `2.0`
218 ///
219 /// # Caveats
220 ///
221 /// This option only applies to single-node connections and is silently ignored
222 /// for cluster mode.
223 ///
224 /// # Examples
225 ///
226 /// ```
227 /// use hitbox_redis::ConnectionMode;
228 ///
229 /// // Use a slower backoff (3^attempt ms)
230 /// let mode = ConnectionMode::single("redis://localhost:6379/")
231 /// .exponent_base(3.0);
232 /// ```
233 #[allow(irrefutable_let_patterns)]
234 pub fn exponent_base(mut self, base: f32) -> Self {
235 if let Self::Single(ref mut config) = self {
236 config.exponent_base = base;
237 }
238 self
239 }
240
241 /// Enables reading from replica nodes (cluster only).
242 ///
243 /// When enabled, read operations may be served by replica nodes for better
244 /// read throughput and reduced load on primary nodes.
245 ///
246 /// # Default
247 ///
248 /// Disabled (reads only from primary nodes).
249 ///
250 /// # Caveats
251 ///
252 /// - Replicas may have slightly stale data due to replication lag
253 /// - This option only applies to cluster connections and is silently ignored
254 /// for single-node mode
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// # #[cfg(feature = "cluster")]
260 /// # fn main() {
261 /// use hitbox_redis::ConnectionMode;
262 ///
263 /// let mode = ConnectionMode::cluster([
264 /// "redis://node1:6379",
265 /// "redis://node2:6379",
266 /// ])
267 /// .read_from_replicas();
268 /// # }
269 /// # #[cfg(not(feature = "cluster"))]
270 /// # fn main() {}
271 /// ```
272 #[cfg(feature = "cluster")]
273 #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
274 pub fn read_from_replicas(mut self) -> Self {
275 if let Self::Cluster(ref mut config) = self {
276 config.read_from_replicas = true;
277 }
278 self
279 }
280}
281
282/// Internal wrapper for Redis connection types.
283#[derive(Clone)]
284enum RedisConnection {
285 Single(ConnectionManager),
286 #[cfg(feature = "cluster")]
287 Cluster(ClusterConnection),
288}
289
290impl RedisConnection {
291 /// Execute a pipeline and return the result.
292 async fn query_pipeline<T: redis::FromRedisValue>(
293 &mut self,
294 pipe: &redis::Pipeline,
295 ) -> Result<T, redis::RedisError> {
296 match self {
297 Self::Single(conn) => pipe.query_async(conn).await,
298 #[cfg(feature = "cluster")]
299 Self::Cluster(conn) => pipe.query_async(conn).await,
300 }
301 }
302
303 /// Execute a single command.
304 async fn query_cmd<T: redis::FromRedisValue>(
305 &mut self,
306 cmd: &mut redis::Cmd,
307 ) -> Result<T, redis::RedisError> {
308 match self {
309 Self::Single(conn) => cmd.query_async(conn).await,
310 #[cfg(feature = "cluster")]
311 Self::Cluster(conn) => cmd.query_async(conn).await,
312 }
313 }
314}
315
316/// Redis cache backend for single-node or cluster deployments.
317///
318/// `RedisBackend` provides a cache backend using Redis as the storage layer.
319/// It supports both single-node Redis instances and Redis Cluster
320/// (with the `cluster` feature enabled).
321///
322/// Use [`RedisBackendBuilder`] to construct this type.
323///
324/// # Type Parameters
325///
326/// * `S` - Serialization format for cache values. Implements [`Format`].
327/// Default: [`BincodeFormat`] (compact binary, recommended for production).
328/// * `C` - Compression strategy for cache values. Implements [`Compressor`].
329/// Default: [`PassthroughCompressor`] (no compression).
330///
331/// # Examples
332///
333/// Basic single-node connection:
334///
335/// ```
336/// use hitbox_redis::{RedisBackend, ConnectionMode};
337///
338/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
339/// let backend = RedisBackend::builder()
340/// .connection(ConnectionMode::single("redis://localhost:6379/"))
341/// .build()?;
342/// # Ok(())
343/// # }
344/// ```
345///
346/// With all configuration options:
347///
348/// ```
349/// use std::time::Duration;
350/// use hitbox_redis::{RedisBackend, ConnectionMode};
351/// use hitbox_backend::CacheKeyFormat;
352/// use hitbox_backend::format::JsonFormat;
353///
354/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
355/// let backend = RedisBackend::builder()
356/// .connection(ConnectionMode::single("redis://localhost:6379/"))
357/// .username("cache_user") // Redis 6+ ACL
358/// .password("secret")
359/// .label("user-sessions")
360/// .key_format(CacheKeyFormat::UrlEncoded)
361/// .value_format(JsonFormat)
362/// .connection_timeout(Duration::from_secs(5))
363/// .response_timeout(Duration::from_secs(2))
364/// .retries(3)
365/// .build()?;
366/// # Ok(())
367/// # }
368/// ```
369///
370/// Cluster connection (requires `cluster` feature):
371///
372/// ```
373/// # #[cfg(feature = "cluster")]
374/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
375/// use hitbox_redis::{RedisBackend, ConnectionMode};
376///
377/// let backend = RedisBackend::builder()
378/// .connection(ConnectionMode::cluster([
379/// "redis://node1:6379",
380/// "redis://node2:6379",
381/// "redis://node3:6379",
382/// ]))
383/// .build()?;
384/// # Ok(())
385/// # }
386/// # #[cfg(not(feature = "cluster"))]
387/// # fn main() {}
388/// ```
389///
390/// # Performance
391///
392/// - **Read operations**: Single pipelined request (`HMGET` + `PTTL`)
393/// - **Write operations**: Single pipelined request (`HSET` + `PEXPIRE`)
394/// - **Connection**: Established lazily on first use, multiplexed for concurrent access
395///
396/// # Caveats
397///
398/// - **Connection failure**: First cache operation will fail if Redis is unreachable
399/// - **Expire time approximation**: The `expire` timestamp returned on read is
400/// calculated as `now + PTTL`, which may drift by the network round-trip time
401///
402/// [`Format`]: hitbox_backend::format::Format
403/// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
404/// [`Compressor`]: hitbox_backend::Compressor
405/// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
406#[derive(Clone)]
407pub struct RedisBackend<S = BincodeFormat, C = PassthroughCompressor>
408where
409 S: Format,
410 C: Compressor,
411{
412 /// Connection mode (single node or cluster).
413 mode: ConnectionMode,
414 /// Timeout for establishing connections.
415 connection_timeout: Option<Duration>,
416 /// Timeout for waiting on Redis responses.
417 response_timeout: Option<Duration>,
418 /// Maximum number of retry attempts.
419 number_of_retries: Option<usize>,
420 /// Username for Redis authentication (Redis 6+ ACL).
421 username: Option<String>,
422 /// Password for Redis authentication.
423 password: Option<String>,
424
425 /// Lazy-initialized connection (established on first cache operation).
426 connection: OnceCell<RedisConnection>,
427
428 /// Format used to serialize cache values.
429 serializer: S,
430 /// Format used to serialize cache keys.
431 key_format: CacheKeyFormat,
432 /// Compressor used for cache values.
433 compressor: C,
434 /// Label identifying this backend in multi-tier compositions.
435 label: BackendLabel,
436}
437
438impl RedisBackend<BincodeFormat, PassthroughCompressor> {
439 /// Creates a new builder for `RedisBackend`.
440 ///
441 /// Use the builder to configure the connection mode, serialization format,
442 /// key format, compression, and label. See [`RedisBackend`] for examples.
443 #[must_use]
444 pub fn builder() -> RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
445 RedisBackendBuilder::default()
446 }
447}
448
449impl<S, C> RedisBackend<S, C>
450where
451 S: Format,
452 C: Compressor,
453{
454 /// Gets or initializes the Redis connection lazily.
455 ///
456 /// This method ensures the connection is established only once, even when
457 /// called concurrently from multiple tasks. Subsequent calls return the
458 /// cached connection.
459 async fn get_connection(&self) -> Result<&RedisConnection, Error> {
460 self.connection
461 .get_or_try_init(|| async {
462 match &self.mode {
463 ConnectionMode::Single(config) => {
464 // Parse URL and apply authentication if provided
465 let mut conn_info: redis::ConnectionInfo = config.url.as_str().parse()?;
466 let mut redis_info = conn_info.redis_settings().clone();
467 if let Some(ref username) = self.username {
468 redis_info = redis_info.set_username(username);
469 }
470 if let Some(ref password) = self.password {
471 redis_info = redis_info.set_password(password);
472 }
473 conn_info = conn_info.set_redis_settings(redis_info);
474
475 let client = Client::open(conn_info)?;
476
477 // Build ConnectionManagerConfig with options
478 let mut manager_config = redis::aio::ConnectionManagerConfig::new()
479 .set_exponent_base(config.exponent_base);
480
481 if let Some(timeout) = self.connection_timeout {
482 manager_config = manager_config.set_connection_timeout(Some(timeout));
483 }
484 if let Some(timeout) = self.response_timeout {
485 manager_config = manager_config.set_response_timeout(Some(timeout));
486 }
487 if let Some(retries) = self.number_of_retries {
488 manager_config = manager_config.set_number_of_retries(retries);
489 }
490
491 let conn = client
492 .get_connection_manager_with_config(manager_config)
493 .await?;
494 Ok(RedisConnection::Single(conn))
495 }
496 #[cfg(feature = "cluster")]
497 ConnectionMode::Cluster(config) => {
498 let mut builder = redis::cluster::ClusterClientBuilder::new(
499 config.nodes.iter().map(|s| s.as_str()),
500 );
501 if config.read_from_replicas {
502 builder = builder.read_from_replicas();
503 }
504 if let Some(ref username) = self.username {
505 builder = builder.username(username.clone());
506 }
507 if let Some(ref password) = self.password {
508 builder = builder.password(password.clone());
509 }
510 if let Some(timeout) = self.connection_timeout {
511 builder = builder.connection_timeout(timeout);
512 }
513 if let Some(timeout) = self.response_timeout {
514 builder = builder.response_timeout(timeout);
515 }
516 if let Some(retries) = self.number_of_retries {
517 builder = builder.retries(retries as u32);
518 }
519
520 let client = builder.build()?;
521 let conn = client.get_async_connection().await?;
522 Ok(RedisConnection::Cluster(conn))
523 }
524 }
525 })
526 .await
527 }
528}
529
530/// Builder for creating and configuring a [`RedisBackend`].
531///
532/// Use [`RedisBackend::builder`] to create a new builder instance.
533/// See [`RedisBackend`] for usage examples.
534pub struct RedisBackendBuilder<S = BincodeFormat, C = PassthroughCompressor>
535where
536 S: Format,
537 C: Compressor,
538{
539 mode: Option<ConnectionMode>,
540 serializer: S,
541 key_format: CacheKeyFormat,
542 compressor: C,
543 label: BackendLabel,
544 // Common connection options
545 connection_timeout: Option<Duration>,
546 response_timeout: Option<Duration>,
547 number_of_retries: Option<usize>,
548 // Authentication
549 username: Option<String>,
550 password: Option<String>,
551}
552
553impl Default for RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
554 fn default() -> Self {
555 Self {
556 mode: None,
557 serializer: BincodeFormat,
558 key_format: CacheKeyFormat::default(),
559 compressor: PassthroughCompressor,
560 label: BackendLabel::new_static("redis"),
561 connection_timeout: None,
562 response_timeout: None,
563 number_of_retries: None,
564 username: None,
565 password: None,
566 }
567 }
568}
569
570impl<S, C> RedisBackendBuilder<S, C>
571where
572 S: Format,
573 C: Compressor,
574{
575 /// Sets the Redis connection mode.
576 ///
577 /// This is required before calling [`build`].
578 ///
579 /// [`build`]: Self::build
580 pub fn connection(mut self, mode: ConnectionMode) -> Self {
581 self.mode = Some(mode);
582 self
583 }
584
585 /// Sets the connection timeout.
586 ///
587 /// This timeout applies when establishing a new connection to Redis.
588 /// If the connection cannot be established within this duration, the operation fails.
589 ///
590 /// # Default
591 ///
592 /// No timeout (waits indefinitely).
593 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
594 self.connection_timeout = Some(timeout);
595 self
596 }
597
598 /// Sets the response timeout.
599 ///
600 /// This timeout applies when waiting for a response from Redis after sending a command.
601 /// If Redis doesn't respond within this duration, the operation fails.
602 ///
603 /// # Default
604 ///
605 /// No timeout (waits indefinitely).
606 ///
607 /// # Note
608 ///
609 /// If you use blocking commands (like `BLPOP`) or long-running commands,
610 /// ensure the timeout is long enough to accommodate them.
611 pub fn response_timeout(mut self, timeout: Duration) -> Self {
612 self.response_timeout = Some(timeout);
613 self
614 }
615
616 /// Sets the maximum number of connection retry attempts.
617 ///
618 /// When a connection fails, the client will retry up to this many times
619 /// with exponential backoff before giving up.
620 ///
621 /// # Default
622 ///
623 /// Uses the redis-rs default (typically 16 retries for single-node).
624 pub fn retries(mut self, count: usize) -> Self {
625 self.number_of_retries = Some(count);
626 self
627 }
628
629 /// Sets the username for Redis authentication.
630 ///
631 /// Used with Redis 6+ ACL system. For older Redis versions using only
632 /// password authentication, leave this unset.
633 ///
634 /// # Default
635 ///
636 /// None (no username).
637 pub fn username(mut self, username: impl Into<String>) -> Self {
638 self.username = Some(username.into());
639 self
640 }
641
642 /// Sets the password for Redis authentication.
643 ///
644 /// Works with both legacy Redis AUTH and Redis 6+ ACL authentication.
645 /// For ACL authentication, also set [`username`](Self::username).
646 ///
647 /// # Default
648 ///
649 /// None (no password).
650 pub fn password(mut self, password: impl Into<String>) -> Self {
651 self.password = Some(password.into());
652 self
653 }
654
655 /// Sets the cache value serialization format.
656 ///
657 /// The value format determines how cached data is serialized before storage.
658 ///
659 /// # Default
660 ///
661 /// [`BincodeFormat`] (compact binary, recommended for production)
662 ///
663 /// # Options
664 ///
665 /// | Format | Speed | Size | Human-readable |
666 /// |--------|-------|------|----------------|
667 /// | [`BincodeFormat`] | Fast | Compact | No |
668 /// | [`JsonFormat`](hitbox_backend::format::JsonFormat) | Slow | Large | Yes |
669 /// | [`RonFormat`](hitbox_backend::format::RonFormat) | Medium | Medium | Yes |
670 ///
671 /// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
672 pub fn value_format<NewS>(self, serializer: NewS) -> RedisBackendBuilder<NewS, C>
673 where
674 NewS: Format,
675 {
676 RedisBackendBuilder {
677 mode: self.mode,
678 serializer,
679 key_format: self.key_format,
680 compressor: self.compressor,
681 label: self.label,
682 connection_timeout: self.connection_timeout,
683 response_timeout: self.response_timeout,
684 number_of_retries: self.number_of_retries,
685 username: self.username,
686 password: self.password,
687 }
688 }
689
690 /// Sets the cache key serialization format.
691 ///
692 /// The key format determines how [`CacheKey`] values are serialized for
693 /// storage as Redis keys. This affects key size and debuggability.
694 ///
695 /// # Default
696 ///
697 /// [`CacheKeyFormat::Bitcode`]
698 ///
699 /// # Options
700 ///
701 /// | Format | Size | Human-readable |
702 /// |--------|------|----------------|
703 /// | [`Bitcode`](CacheKeyFormat::Bitcode) | Compact | No |
704 /// | [`UrlEncoded`](CacheKeyFormat::UrlEncoded) | Larger | Yes |
705 ///
706 /// [`CacheKey`]: hitbox::CacheKey
707 pub fn key_format(mut self, key_format: CacheKeyFormat) -> Self {
708 self.key_format = key_format;
709 self
710 }
711
712 /// Sets a custom label for this backend.
713 ///
714 /// The label identifies this backend in multi-tier cache compositions and
715 /// appears in metrics and debug output.
716 ///
717 /// # Default
718 ///
719 /// `"redis"`
720 pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
721 self.label = label.into();
722 self
723 }
724
725 /// Sets the compression strategy for cache values.
726 ///
727 /// Compression reduces network bandwidth and Redis memory usage at the cost
728 /// of CPU time. For Redis backends, compression is often beneficial since
729 /// network I/O is typically the bottleneck.
730 ///
731 /// # Default
732 ///
733 /// [`PassthroughCompressor`] (no compression)
734 ///
735 /// # Options
736 ///
737 /// | Compressor | Ratio | Speed | Feature flag |
738 /// |------------|-------|-------|--------------|
739 /// | [`PassthroughCompressor`] | None | Fastest | — |
740 /// | [`GzipCompressor`] | Good | Medium | `gzip` |
741 /// | [`ZstdCompressor`] | Best | Fast | `zstd` |
742 ///
743 /// # When to Use Compression
744 ///
745 /// - Cached values larger than ~1KB
746 /// - High network latency to Redis
747 /// - Redis memory is constrained
748 /// - Using Redis persistence (RDB/AOF)
749 ///
750 /// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
751 /// [`GzipCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.GzipCompressor.html
752 /// [`ZstdCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.ZstdCompressor.html
753 pub fn compressor<NewC>(self, compressor: NewC) -> RedisBackendBuilder<S, NewC>
754 where
755 NewC: Compressor,
756 {
757 RedisBackendBuilder {
758 mode: self.mode,
759 serializer: self.serializer,
760 key_format: self.key_format,
761 compressor,
762 label: self.label,
763 connection_timeout: self.connection_timeout,
764 response_timeout: self.response_timeout,
765 number_of_retries: self.number_of_retries,
766 username: self.username,
767 password: self.password,
768 }
769 }
770
771 /// Builds the [`RedisBackend`] with the configured settings.
772 ///
773 /// This method is synchronous - the actual Redis connection is established
774 /// lazily on first use (get/set/delete operation).
775 ///
776 /// # Errors
777 ///
778 /// Returns [`Error::MissingConnectionMode`] if no connection mode was specified.
779 /// Note: Connection errors will occur on first cache operation, not here.
780 ///
781 /// [`Error::MissingConnectionMode`]: crate::error::Error::MissingConnectionMode
782 pub fn build(self) -> Result<RedisBackend<S, C>, Error> {
783 let mode = self.mode.ok_or(Error::MissingConnectionMode)?;
784
785 Ok(RedisBackend {
786 mode,
787 connection_timeout: self.connection_timeout,
788 response_timeout: self.response_timeout,
789 number_of_retries: self.number_of_retries,
790 username: self.username,
791 password: self.password,
792 connection: OnceCell::new(),
793 serializer: self.serializer,
794 key_format: self.key_format,
795 compressor: self.compressor,
796 label: self.label,
797 })
798 }
799}
800
801#[async_trait]
802impl<S, C> Backend for RedisBackend<S, C>
803where
804 S: Format + Send + Sync,
805 C: Compressor + Send + Sync,
806{
807 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
808 let mut con = self.get_connection().await?.clone();
809 let cache_key = self.key_format.serialize(key)?;
810
811 // Pipeline: HMGET (data, stale) + PTTL with typed decoding
812 let ((data, stale_ms), pttl): ((Option<Vec<u8>>, Option<i64>), i64) = con
813 .query_pipeline(
814 redis::pipe()
815 .cmd("HMGET")
816 .arg(&cache_key)
817 .arg("d")
818 .arg("s")
819 .cmd("PTTL")
820 .arg(&cache_key),
821 )
822 .await
823 .map_err(Error::from)?;
824
825 // If data is None, key doesn't exist
826 let data = match data {
827 Some(data) => Bytes::from(data),
828 None => return Ok(None),
829 };
830
831 // Convert stale millis to DateTime
832 let stale = stale_ms.and_then(DateTime::from_timestamp_millis);
833
834 // Calculate expire from PTTL (milliseconds remaining)
835 // PTTL returns: -2 if key doesn't exist, -1 if no TTL, else milliseconds
836 let expire = (pttl > 0).then(|| Utc::now() + chrono::Duration::milliseconds(pttl));
837
838 Ok(Some(CacheValue::new(data, expire, stale)))
839 }
840
841 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
842 let mut con = self.get_connection().await?.clone();
843 let cache_key = self.key_format.serialize(key)?;
844
845 // Build HSET command with data field, optionally add stale field
846 let mut cmd = redis::cmd("HSET");
847 cmd.arg(&cache_key).arg("d").arg(value.data().as_ref());
848 if let Some(stale) = value.stale() {
849 cmd.arg("s").arg(stale.timestamp_millis());
850 }
851
852 // Pipeline: HSET + optional PEXPIRE (computed from value.ttl())
853 let mut pipe = redis::pipe();
854 pipe.add_command(cmd).ignore();
855 if let Some(ttl_duration) = value.ttl() {
856 pipe.cmd("PEXPIRE")
857 .arg(&cache_key)
858 .arg(u64::try_from(ttl_duration.as_millis()).map_err(|_| {
859 BackendError::InternalError(Box::new(std::io::Error::new(
860 std::io::ErrorKind::InvalidInput,
861 format!(
862 "TTL overflow: {}ms exceeds u64 range",
863 ttl_duration.as_millis()
864 ),
865 )))
866 })?)
867 .ignore();
868 }
869
870 con.query_pipeline::<()>(&pipe).await.map_err(Error::from)?;
871 Ok(())
872 }
873
874 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
875 let mut con = self.get_connection().await?.clone();
876 let cache_key = self.key_format.serialize(key)?;
877
878 let deleted: i32 = con
879 .query_cmd(redis::cmd("DEL").arg(cache_key))
880 .await
881 .map_err(Error::from)?;
882
883 if deleted > 0 {
884 Ok(DeleteStatus::Deleted(deleted as u32))
885 } else {
886 Ok(DeleteStatus::Missing)
887 }
888 }
889
890 fn label(&self) -> BackendLabel {
891 self.label.clone()
892 }
893
894 fn value_format(&self) -> &dyn Format {
895 &self.serializer
896 }
897
898 fn key_format(&self) -> &CacheKeyFormat {
899 &self.key_format
900 }
901
902 fn compressor(&self) -> &dyn Compressor {
903 &self.compressor
904 }
905}
906
907// Explicit CacheBackend implementation using default trait methods
908impl<S, C> hitbox_backend::CacheBackend for RedisBackend<S, C>
909where
910 S: Format + Send + Sync,
911 C: Compressor + Send + Sync,
912{
913}