hitbox_redis/backend.rs
1//! Redis backend implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use hitbox::{BackendLabel, CacheKey, CacheValue, Raw};
9use hitbox_backend::{
10 Backend, BackendResult, CacheKeyFormat, Compressor, DeleteStatus, PassthroughCompressor,
11 format::{BincodeFormat, Format},
12};
13use redis::Client;
14use redis::aio::ConnectionManager;
15#[cfg(feature = "cluster")]
16use redis::cluster_async::ClusterConnection;
17use tokio::sync::OnceCell;
18
19use crate::error::Error;
20
21/// Configuration for a single Redis node connection.
22///
23/// # When You'll Encounter This
24///
25/// You typically don't create this directly. It appears when:
26/// - Using [`ConnectionMode::single`] which creates this internally
27/// - Accessing configuration for debugging or logging
28///
29/// # Examples
30///
31/// ```
32/// use hitbox_redis::SingleConfig;
33///
34/// let config = SingleConfig::new("redis://localhost:6379/");
35/// ```
36#[derive(Debug, Clone)]
37pub struct SingleConfig {
38 /// Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`.
39 pub(crate) url: String,
40 /// Exponential backoff base for reconnection attempts. Default: `2.0`.
41 pub(crate) exponent_base: f32,
42}
43
44impl SingleConfig {
45 /// Creates a new single-node configuration.
46 ///
47 /// # Arguments
48 ///
49 /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
50 ///
51 /// # Default
52 ///
53 /// * `exponent_base`: `2.0` (exponential backoff base for retries)
54 ///
55 /// # Examples
56 ///
57 /// ```
58 /// use hitbox_redis::SingleConfig;
59 ///
60 /// let config = SingleConfig::new("redis://localhost:6379/0");
61 /// ```
62 pub fn new(url: impl Into<String>) -> Self {
63 Self {
64 url: url.into(),
65 exponent_base: 2.0,
66 }
67 }
68}
69
70/// Configuration for a Redis Cluster connection.
71///
72/// # When You'll Encounter This
73///
74/// You typically don't create this directly. It appears when:
75/// - Using [`ConnectionMode::cluster`] which creates this internally
76/// - Accessing configuration for debugging or logging
77///
78/// # Examples
79///
80/// ```
81/// # #[cfg(feature = "cluster")]
82/// # fn main() {
83/// use hitbox_redis::ClusterConfig;
84///
85/// let config = ClusterConfig::new([
86/// "redis://node1:6379",
87/// "redis://node2:6379",
88/// "redis://node3:6379",
89/// ]);
90/// # }
91/// # #[cfg(not(feature = "cluster"))]
92/// # fn main() {}
93/// ```
94#[cfg(feature = "cluster")]
95#[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
96#[derive(Debug, Clone)]
97pub struct ClusterConfig {
98 /// List of initial cluster node URLs. The client discovers other nodes automatically.
99 pub(crate) nodes: Vec<String>,
100 /// Whether to allow reading from replica nodes. Default: `false`.
101 pub(crate) read_from_replicas: bool,
102}
103
104#[cfg(feature = "cluster")]
105impl ClusterConfig {
106 /// Creates a new cluster configuration.
107 ///
108 /// # Arguments
109 ///
110 /// * `nodes` - List of initial cluster node URLs. The client discovers
111 /// other nodes automatically via the `CLUSTER SLOTS` command.
112 ///
113 /// # Default
114 ///
115 /// * `read_from_replicas`: `false`
116 ///
117 /// # Examples
118 ///
119 /// ```
120 /// # #[cfg(feature = "cluster")]
121 /// # fn main() {
122 /// use hitbox_redis::ClusterConfig;
123 ///
124 /// let config = ClusterConfig::new([
125 /// "redis://node1:6379",
126 /// "redis://node2:6379",
127 /// "redis://node3:6379",
128 /// ]);
129 /// # }
130 /// # #[cfg(not(feature = "cluster"))]
131 /// # fn main() {}
132 /// ```
133 pub fn new<I, S>(nodes: I) -> Self
134 where
135 I: IntoIterator<Item = S>,
136 S: Into<String>,
137 {
138 Self {
139 nodes: nodes.into_iter().map(Into::into).collect(),
140 read_from_replicas: false,
141 }
142 }
143}
144
145/// Redis connection mode.
146///
147/// Determines whether to connect to a single Redis instance or a Redis Cluster.
148///
149/// # Examples
150///
151/// Single-node connection:
152/// ```
153/// use hitbox_redis::ConnectionMode;
154///
155/// let mode = ConnectionMode::single("redis://localhost:6379/");
156/// ```
157///
158/// Cluster connection (requires `cluster` feature):
159///
160/// ```
161/// # #[cfg(feature = "cluster")]
162/// # fn main() {
163/// use hitbox_redis::ConnectionMode;
164///
165/// let mode = ConnectionMode::cluster([
166/// "redis://node1:6379",
167/// "redis://node2:6379",
168/// "redis://node3:6379",
169/// ]);
170/// # }
171/// # #[cfg(not(feature = "cluster"))]
172/// # fn main() {}
173/// ```
174#[derive(Debug, Clone)]
175pub enum ConnectionMode {
176 /// Single Redis node connection.
177 Single(SingleConfig),
178
179 /// Redis Cluster connection.
180 #[cfg(feature = "cluster")]
181 #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
182 Cluster(ClusterConfig),
183}
184
185impl ConnectionMode {
186 /// Create a single-node connection mode.
187 ///
188 /// # Arguments
189 ///
190 /// * `url` - Redis connection URL in format `redis://[:<password>@]<host>[:<port>][/<database>]`
191 pub fn single(url: impl Into<String>) -> Self {
192 Self::Single(SingleConfig::new(url))
193 }
194
195 /// Create a cluster connection mode.
196 ///
197 /// # Arguments
198 ///
199 /// * `nodes` - List of initial cluster node URLs. The client will discover other nodes automatically.
200 #[cfg(feature = "cluster")]
201 #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
202 pub fn cluster<I, S>(nodes: I) -> Self
203 where
204 I: IntoIterator<Item = S>,
205 S: Into<String>,
206 {
207 Self::Cluster(ClusterConfig::new(nodes))
208 }
209
210 /// Sets the exponential backoff base for retries (single-node only).
211 ///
212 /// The delay between reconnection attempts is calculated as `base^attempt` milliseconds.
213 ///
214 /// # Default
215 ///
216 /// `2.0`
217 ///
218 /// # Caveats
219 ///
220 /// This option only applies to single-node connections and is silently ignored
221 /// for cluster mode.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// use hitbox_redis::ConnectionMode;
227 ///
228 /// // Use a slower backoff (3^attempt ms)
229 /// let mode = ConnectionMode::single("redis://localhost:6379/")
230 /// .exponent_base(3.0);
231 /// ```
232 #[allow(irrefutable_let_patterns)]
233 pub fn exponent_base(mut self, base: f32) -> Self {
234 if let Self::Single(ref mut config) = self {
235 config.exponent_base = base;
236 }
237 self
238 }
239
240 /// Enables reading from replica nodes (cluster only).
241 ///
242 /// When enabled, read operations may be served by replica nodes for better
243 /// read throughput and reduced load on primary nodes.
244 ///
245 /// # Default
246 ///
247 /// Disabled (reads only from primary nodes).
248 ///
249 /// # Caveats
250 ///
251 /// - Replicas may have slightly stale data due to replication lag
252 /// - This option only applies to cluster connections and is silently ignored
253 /// for single-node mode
254 ///
255 /// # Examples
256 ///
257 /// ```
258 /// # #[cfg(feature = "cluster")]
259 /// # fn main() {
260 /// use hitbox_redis::ConnectionMode;
261 ///
262 /// let mode = ConnectionMode::cluster([
263 /// "redis://node1:6379",
264 /// "redis://node2:6379",
265 /// ])
266 /// .read_from_replicas();
267 /// # }
268 /// # #[cfg(not(feature = "cluster"))]
269 /// # fn main() {}
270 /// ```
271 #[cfg(feature = "cluster")]
272 #[cfg_attr(docsrs, doc(cfg(feature = "cluster")))]
273 pub fn read_from_replicas(mut self) -> Self {
274 if let Self::Cluster(ref mut config) = self {
275 config.read_from_replicas = true;
276 }
277 self
278 }
279}
280
281/// Internal wrapper for Redis connection types.
282#[derive(Clone)]
283enum RedisConnection {
284 Single(ConnectionManager),
285 #[cfg(feature = "cluster")]
286 Cluster(ClusterConnection),
287}
288
289impl RedisConnection {
290 /// Execute a pipeline and return the result.
291 async fn query_pipeline<T: redis::FromRedisValue>(
292 &mut self,
293 pipe: &redis::Pipeline,
294 ) -> Result<T, redis::RedisError> {
295 match self {
296 Self::Single(conn) => pipe.query_async(conn).await,
297 #[cfg(feature = "cluster")]
298 Self::Cluster(conn) => pipe.query_async(conn).await,
299 }
300 }
301
302 /// Execute a single command.
303 async fn query_cmd<T: redis::FromRedisValue>(
304 &mut self,
305 cmd: &mut redis::Cmd,
306 ) -> Result<T, redis::RedisError> {
307 match self {
308 Self::Single(conn) => cmd.query_async(conn).await,
309 #[cfg(feature = "cluster")]
310 Self::Cluster(conn) => cmd.query_async(conn).await,
311 }
312 }
313}
314
315/// Redis cache backend for single-node or cluster deployments.
316///
317/// `RedisBackend` provides a cache backend using Redis as the storage layer.
318/// It supports both single-node Redis instances and Redis Cluster
319/// (with the `cluster` feature enabled).
320///
321/// Use [`RedisBackendBuilder`] to construct this type.
322///
323/// # Type Parameters
324///
325/// * `S` - Serialization format for cache values. Implements [`Format`].
326/// Default: [`BincodeFormat`] (compact binary, recommended for production).
327/// * `C` - Compression strategy for cache values. Implements [`Compressor`].
328/// Default: [`PassthroughCompressor`] (no compression).
329///
330/// # Examples
331///
332/// Basic single-node connection:
333///
334/// ```
335/// use hitbox_redis::{RedisBackend, ConnectionMode};
336///
337/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
338/// let backend = RedisBackend::builder()
339/// .connection(ConnectionMode::single("redis://localhost:6379/"))
340/// .build()?;
341/// # Ok(())
342/// # }
343/// ```
344///
345/// With all configuration options:
346///
347/// ```
348/// use std::time::Duration;
349/// use hitbox_redis::{RedisBackend, ConnectionMode};
350/// use hitbox_backend::CacheKeyFormat;
351/// use hitbox_backend::format::JsonFormat;
352///
353/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
354/// let backend = RedisBackend::builder()
355/// .connection(ConnectionMode::single("redis://localhost:6379/"))
356/// .username("cache_user") // Redis 6+ ACL
357/// .password("secret")
358/// .label("user-sessions")
359/// .key_format(CacheKeyFormat::UrlEncoded)
360/// .value_format(JsonFormat)
361/// .connection_timeout(Duration::from_secs(5))
362/// .response_timeout(Duration::from_secs(2))
363/// .retries(3)
364/// .build()?;
365/// # Ok(())
366/// # }
367/// ```
368///
369/// Cluster connection (requires `cluster` feature):
370///
371/// ```
372/// # #[cfg(feature = "cluster")]
373/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
374/// use hitbox_redis::{RedisBackend, ConnectionMode};
375///
376/// let backend = RedisBackend::builder()
377/// .connection(ConnectionMode::cluster([
378/// "redis://node1:6379",
379/// "redis://node2:6379",
380/// "redis://node3:6379",
381/// ]))
382/// .build()?;
383/// # Ok(())
384/// # }
385/// # #[cfg(not(feature = "cluster"))]
386/// # fn main() {}
387/// ```
388///
389/// # Performance
390///
391/// - **Read operations**: Single pipelined request (`HMGET` + `PTTL`)
392/// - **Write operations**: Single pipelined request (`HSET` + `EXPIRE`)
393/// - **Connection**: Established lazily on first use, multiplexed for concurrent access
394///
395/// # Caveats
396///
397/// - **Connection failure**: First cache operation will fail if Redis is unreachable
398/// - **Expire time approximation**: The `expire` timestamp returned on read is
399/// calculated as `now + PTTL`, which may drift by the network round-trip time
400///
401/// [`Format`]: hitbox_backend::format::Format
402/// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
403/// [`Compressor`]: hitbox_backend::Compressor
404/// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
405#[derive(Clone)]
406pub struct RedisBackend<S = BincodeFormat, C = PassthroughCompressor>
407where
408 S: Format,
409 C: Compressor,
410{
411 /// Connection mode (single node or cluster).
412 mode: ConnectionMode,
413 /// Timeout for establishing connections.
414 connection_timeout: Option<Duration>,
415 /// Timeout for waiting on Redis responses.
416 response_timeout: Option<Duration>,
417 /// Maximum number of retry attempts.
418 number_of_retries: Option<usize>,
419 /// Username for Redis authentication (Redis 6+ ACL).
420 username: Option<String>,
421 /// Password for Redis authentication.
422 password: Option<String>,
423
424 /// Lazy-initialized connection (established on first cache operation).
425 connection: OnceCell<RedisConnection>,
426
427 /// Format used to serialize cache values.
428 serializer: S,
429 /// Format used to serialize cache keys.
430 key_format: CacheKeyFormat,
431 /// Compressor used for cache values.
432 compressor: C,
433 /// Label identifying this backend in multi-tier compositions.
434 label: BackendLabel,
435}
436
437impl RedisBackend<BincodeFormat, PassthroughCompressor> {
438 /// Creates a new builder for `RedisBackend`.
439 ///
440 /// Use the builder to configure the connection mode, serialization format,
441 /// key format, compression, and label. See [`RedisBackend`] for examples.
442 #[must_use]
443 pub fn builder() -> RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
444 RedisBackendBuilder::default()
445 }
446}
447
448impl<S, C> RedisBackend<S, C>
449where
450 S: Format,
451 C: Compressor,
452{
453 /// Gets or initializes the Redis connection lazily.
454 ///
455 /// This method ensures the connection is established only once, even when
456 /// called concurrently from multiple tasks. Subsequent calls return the
457 /// cached connection.
458 async fn get_connection(&self) -> Result<&RedisConnection, Error> {
459 self.connection
460 .get_or_try_init(|| async {
461 match &self.mode {
462 ConnectionMode::Single(config) => {
463 // Parse URL and apply authentication if provided
464 let mut conn_info: redis::ConnectionInfo = config.url.as_str().parse()?;
465 let mut redis_info = conn_info.redis_settings().clone();
466 if let Some(ref username) = self.username {
467 redis_info = redis_info.set_username(username);
468 }
469 if let Some(ref password) = self.password {
470 redis_info = redis_info.set_password(password);
471 }
472 conn_info = conn_info.set_redis_settings(redis_info);
473
474 let client = Client::open(conn_info)?;
475
476 // Build ConnectionManagerConfig with options
477 let mut manager_config = redis::aio::ConnectionManagerConfig::new()
478 .set_exponent_base(config.exponent_base);
479
480 if let Some(timeout) = self.connection_timeout {
481 manager_config = manager_config.set_connection_timeout(Some(timeout));
482 }
483 if let Some(timeout) = self.response_timeout {
484 manager_config = manager_config.set_response_timeout(Some(timeout));
485 }
486 if let Some(retries) = self.number_of_retries {
487 manager_config = manager_config.set_number_of_retries(retries);
488 }
489
490 let conn = client
491 .get_connection_manager_with_config(manager_config)
492 .await?;
493 Ok(RedisConnection::Single(conn))
494 }
495 #[cfg(feature = "cluster")]
496 ConnectionMode::Cluster(config) => {
497 let mut builder = redis::cluster::ClusterClientBuilder::new(
498 config.nodes.iter().map(|s| s.as_str()),
499 );
500 if config.read_from_replicas {
501 builder = builder.read_from_replicas();
502 }
503 if let Some(ref username) = self.username {
504 builder = builder.username(username.clone());
505 }
506 if let Some(ref password) = self.password {
507 builder = builder.password(password.clone());
508 }
509 if let Some(timeout) = self.connection_timeout {
510 builder = builder.connection_timeout(timeout);
511 }
512 if let Some(timeout) = self.response_timeout {
513 builder = builder.response_timeout(timeout);
514 }
515 if let Some(retries) = self.number_of_retries {
516 builder = builder.retries(retries as u32);
517 }
518
519 let client = builder.build()?;
520 let conn = client.get_async_connection().await?;
521 Ok(RedisConnection::Cluster(conn))
522 }
523 }
524 })
525 .await
526 }
527}
528
529/// Builder for creating and configuring a [`RedisBackend`].
530///
531/// Use [`RedisBackend::builder`] to create a new builder instance.
532/// See [`RedisBackend`] for usage examples.
533pub struct RedisBackendBuilder<S = BincodeFormat, C = PassthroughCompressor>
534where
535 S: Format,
536 C: Compressor,
537{
538 mode: Option<ConnectionMode>,
539 serializer: S,
540 key_format: CacheKeyFormat,
541 compressor: C,
542 label: BackendLabel,
543 // Common connection options
544 connection_timeout: Option<Duration>,
545 response_timeout: Option<Duration>,
546 number_of_retries: Option<usize>,
547 // Authentication
548 username: Option<String>,
549 password: Option<String>,
550}
551
552impl Default for RedisBackendBuilder<BincodeFormat, PassthroughCompressor> {
553 fn default() -> Self {
554 Self {
555 mode: None,
556 serializer: BincodeFormat,
557 key_format: CacheKeyFormat::default(),
558 compressor: PassthroughCompressor,
559 label: BackendLabel::new_static("redis"),
560 connection_timeout: None,
561 response_timeout: None,
562 number_of_retries: None,
563 username: None,
564 password: None,
565 }
566 }
567}
568
569impl<S, C> RedisBackendBuilder<S, C>
570where
571 S: Format,
572 C: Compressor,
573{
574 /// Sets the Redis connection mode.
575 ///
576 /// This is required before calling [`build`].
577 ///
578 /// [`build`]: Self::build
579 pub fn connection(mut self, mode: ConnectionMode) -> Self {
580 self.mode = Some(mode);
581 self
582 }
583
584 /// Sets the connection timeout.
585 ///
586 /// This timeout applies when establishing a new connection to Redis.
587 /// If the connection cannot be established within this duration, the operation fails.
588 ///
589 /// # Default
590 ///
591 /// No timeout (waits indefinitely).
592 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
593 self.connection_timeout = Some(timeout);
594 self
595 }
596
597 /// Sets the response timeout.
598 ///
599 /// This timeout applies when waiting for a response from Redis after sending a command.
600 /// If Redis doesn't respond within this duration, the operation fails.
601 ///
602 /// # Default
603 ///
604 /// No timeout (waits indefinitely).
605 ///
606 /// # Note
607 ///
608 /// If you use blocking commands (like `BLPOP`) or long-running commands,
609 /// ensure the timeout is long enough to accommodate them.
610 pub fn response_timeout(mut self, timeout: Duration) -> Self {
611 self.response_timeout = Some(timeout);
612 self
613 }
614
615 /// Sets the maximum number of connection retry attempts.
616 ///
617 /// When a connection fails, the client will retry up to this many times
618 /// with exponential backoff before giving up.
619 ///
620 /// # Default
621 ///
622 /// Uses the redis-rs default (typically 16 retries for single-node).
623 pub fn retries(mut self, count: usize) -> Self {
624 self.number_of_retries = Some(count);
625 self
626 }
627
628 /// Sets the username for Redis authentication.
629 ///
630 /// Used with Redis 6+ ACL system. For older Redis versions using only
631 /// password authentication, leave this unset.
632 ///
633 /// # Default
634 ///
635 /// None (no username).
636 pub fn username(mut self, username: impl Into<String>) -> Self {
637 self.username = Some(username.into());
638 self
639 }
640
641 /// Sets the password for Redis authentication.
642 ///
643 /// Works with both legacy Redis AUTH and Redis 6+ ACL authentication.
644 /// For ACL authentication, also set [`username`](Self::username).
645 ///
646 /// # Default
647 ///
648 /// None (no password).
649 pub fn password(mut self, password: impl Into<String>) -> Self {
650 self.password = Some(password.into());
651 self
652 }
653
654 /// Sets the cache value serialization format.
655 ///
656 /// The value format determines how cached data is serialized before storage.
657 ///
658 /// # Default
659 ///
660 /// [`BincodeFormat`] (compact binary, recommended for production)
661 ///
662 /// # Options
663 ///
664 /// | Format | Speed | Size | Human-readable |
665 /// |--------|-------|------|----------------|
666 /// | [`BincodeFormat`] | Fast | Compact | No |
667 /// | [`JsonFormat`](hitbox_backend::format::JsonFormat) | Slow | Large | Yes |
668 /// | [`RonFormat`](hitbox_backend::format::RonFormat) | Medium | Medium | Yes |
669 ///
670 /// [`BincodeFormat`]: hitbox_backend::format::BincodeFormat
671 pub fn value_format<NewS>(self, serializer: NewS) -> RedisBackendBuilder<NewS, C>
672 where
673 NewS: Format,
674 {
675 RedisBackendBuilder {
676 mode: self.mode,
677 serializer,
678 key_format: self.key_format,
679 compressor: self.compressor,
680 label: self.label,
681 connection_timeout: self.connection_timeout,
682 response_timeout: self.response_timeout,
683 number_of_retries: self.number_of_retries,
684 username: self.username,
685 password: self.password,
686 }
687 }
688
689 /// Sets the cache key serialization format.
690 ///
691 /// The key format determines how [`CacheKey`] values are serialized for
692 /// storage as Redis keys. This affects key size and debuggability.
693 ///
694 /// # Default
695 ///
696 /// [`CacheKeyFormat::Bitcode`]
697 ///
698 /// # Options
699 ///
700 /// | Format | Size | Human-readable |
701 /// |--------|------|----------------|
702 /// | [`Bitcode`](CacheKeyFormat::Bitcode) | Compact | No |
703 /// | [`UrlEncoded`](CacheKeyFormat::UrlEncoded) | Larger | Yes |
704 ///
705 /// [`CacheKey`]: hitbox::CacheKey
706 pub fn key_format(mut self, key_format: CacheKeyFormat) -> Self {
707 self.key_format = key_format;
708 self
709 }
710
711 /// Sets a custom label for this backend.
712 ///
713 /// The label identifies this backend in multi-tier cache compositions and
714 /// appears in metrics and debug output.
715 ///
716 /// # Default
717 ///
718 /// `"redis"`
719 pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
720 self.label = label.into();
721 self
722 }
723
724 /// Sets the compression strategy for cache values.
725 ///
726 /// Compression reduces network bandwidth and Redis memory usage at the cost
727 /// of CPU time. For Redis backends, compression is often beneficial since
728 /// network I/O is typically the bottleneck.
729 ///
730 /// # Default
731 ///
732 /// [`PassthroughCompressor`] (no compression)
733 ///
734 /// # Options
735 ///
736 /// | Compressor | Ratio | Speed | Feature flag |
737 /// |------------|-------|-------|--------------|
738 /// | [`PassthroughCompressor`] | None | Fastest | — |
739 /// | [`GzipCompressor`] | Good | Medium | `gzip` |
740 /// | [`ZstdCompressor`] | Best | Fast | `zstd` |
741 ///
742 /// # When to Use Compression
743 ///
744 /// - Cached values larger than ~1KB
745 /// - High network latency to Redis
746 /// - Redis memory is constrained
747 /// - Using Redis persistence (RDB/AOF)
748 ///
749 /// [`PassthroughCompressor`]: hitbox_backend::PassthroughCompressor
750 /// [`GzipCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.GzipCompressor.html
751 /// [`ZstdCompressor`]: https://docs.rs/hitbox-backend/latest/hitbox_backend/struct.ZstdCompressor.html
752 pub fn compressor<NewC>(self, compressor: NewC) -> RedisBackendBuilder<S, NewC>
753 where
754 NewC: Compressor,
755 {
756 RedisBackendBuilder {
757 mode: self.mode,
758 serializer: self.serializer,
759 key_format: self.key_format,
760 compressor,
761 label: self.label,
762 connection_timeout: self.connection_timeout,
763 response_timeout: self.response_timeout,
764 number_of_retries: self.number_of_retries,
765 username: self.username,
766 password: self.password,
767 }
768 }
769
770 /// Builds the [`RedisBackend`] with the configured settings.
771 ///
772 /// This method is synchronous - the actual Redis connection is established
773 /// lazily on first use (get/set/delete operation).
774 ///
775 /// # Errors
776 ///
777 /// Returns [`Error::MissingConnectionMode`] if no connection mode was specified.
778 /// Note: Connection errors will occur on first cache operation, not here.
779 ///
780 /// [`Error::MissingConnectionMode`]: crate::error::Error::MissingConnectionMode
781 pub fn build(self) -> Result<RedisBackend<S, C>, Error> {
782 let mode = self.mode.ok_or(Error::MissingConnectionMode)?;
783
784 Ok(RedisBackend {
785 mode,
786 connection_timeout: self.connection_timeout,
787 response_timeout: self.response_timeout,
788 number_of_retries: self.number_of_retries,
789 username: self.username,
790 password: self.password,
791 connection: OnceCell::new(),
792 serializer: self.serializer,
793 key_format: self.key_format,
794 compressor: self.compressor,
795 label: self.label,
796 })
797 }
798}
799
800#[async_trait]
801impl<S, C> Backend for RedisBackend<S, C>
802where
803 S: Format + Send + Sync,
804 C: Compressor + Send + Sync,
805{
806 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
807 let mut con = self.get_connection().await?.clone();
808 let cache_key = self.key_format.serialize(key)?;
809
810 // Pipeline: HMGET (data, stale) + PTTL with typed decoding
811 let ((data, stale_ms), pttl): ((Option<Vec<u8>>, Option<i64>), i64) = con
812 .query_pipeline(
813 redis::pipe()
814 .cmd("HMGET")
815 .arg(&cache_key)
816 .arg("d")
817 .arg("s")
818 .cmd("PTTL")
819 .arg(&cache_key),
820 )
821 .await
822 .map_err(Error::from)?;
823
824 // If data is None, key doesn't exist
825 let data = match data {
826 Some(data) => Bytes::from(data),
827 None => return Ok(None),
828 };
829
830 // Convert stale millis to DateTime
831 let stale = stale_ms.and_then(DateTime::from_timestamp_millis);
832
833 // Calculate expire from PTTL (milliseconds remaining)
834 // PTTL returns: -2 if key doesn't exist, -1 if no TTL, else milliseconds
835 let expire = (pttl > 0).then(|| Utc::now() + chrono::Duration::milliseconds(pttl));
836
837 Ok(Some(CacheValue::new(data, expire, stale)))
838 }
839
840 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
841 let mut con = self.get_connection().await?.clone();
842 let cache_key = self.key_format.serialize(key)?;
843
844 // Build HSET command with data field, optionally add stale field
845 let mut cmd = redis::cmd("HSET");
846 cmd.arg(&cache_key).arg("d").arg(value.data().as_ref());
847 if let Some(stale) = value.stale() {
848 cmd.arg("s").arg(stale.timestamp_millis());
849 }
850
851 // Pipeline: HSET + optional EXPIRE (computed from value.ttl())
852 let mut pipe = redis::pipe();
853 pipe.add_command(cmd).ignore();
854 if let Some(ttl_duration) = value.ttl() {
855 pipe.cmd("EXPIRE")
856 .arg(&cache_key)
857 .arg(ttl_duration.as_secs())
858 .ignore();
859 }
860
861 con.query_pipeline::<()>(&pipe).await.map_err(Error::from)?;
862 Ok(())
863 }
864
865 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
866 let mut con = self.get_connection().await?.clone();
867 let cache_key = self.key_format.serialize(key)?;
868
869 let deleted: i32 = con
870 .query_cmd(redis::cmd("DEL").arg(cache_key))
871 .await
872 .map_err(Error::from)?;
873
874 if deleted > 0 {
875 Ok(DeleteStatus::Deleted(deleted as u32))
876 } else {
877 Ok(DeleteStatus::Missing)
878 }
879 }
880
881 fn label(&self) -> BackendLabel {
882 self.label.clone()
883 }
884
885 fn value_format(&self) -> &dyn Format {
886 &self.serializer
887 }
888
889 fn key_format(&self) -> &CacheKeyFormat {
890 &self.key_format
891 }
892
893 fn compressor(&self) -> &dyn Compressor {
894 &self.compressor
895 }
896}
897
898// Explicit CacheBackend implementation using default trait methods
899impl<S, C> hitbox_backend::CacheBackend for RedisBackend<S, C>
900where
901 S: Format + Send + Sync,
902 C: Compressor + Send + Sync,
903{
904}