Skip to main content

mx_cache/
redis_cache.rs

1use std::time::Duration;
2
3use bytes::Bytes;
4use redis::Client;
5use redis::sentinel::Sentinel;
6use tracing::{debug, info};
7
8use crate::Cache as CacheTrait;
9use crate::utils::namespaced;
10
11#[derive(Clone)]
12pub struct RedisCache {
13    manager: redis::aio::ConnectionManager,
14    prefix: String,
15}
16
17impl RedisCache {
18    pub async fn connect(url: &str, prefix: &str) -> anyhow::Result<Self> {
19        Self::connect_with_name(url, prefix, None).await
20    }
21
22    pub async fn connect_with_name(
23        url: &str,
24        prefix: &str,
25        connection_name: Option<&str>,
26    ) -> anyhow::Result<Self> {
27        let client = Client::open(url)?;
28        let mut manager = client.get_connection_manager().await?;
29
30        // Set connection name if provided
31        if let Some(name) = connection_name {
32            redis::cmd("CLIENT")
33                .arg("SETNAME")
34                .arg(name)
35                .query_async::<()>(&mut manager)
36                .await?;
37        }
38
39        Ok(Self {
40            manager,
41            prefix: prefix.to_owned(),
42        })
43    }
44
45    /// Connects to Redis via Sentinel, resolving the current master.
46    ///
47    /// Uses `Sentinel::async_master_for()` to discover the master address,
48    /// then creates a standard `ConnectionManager` from the resolved `Client`.
49    pub async fn connect_sentinel(
50        sentinel_urls: &[&str],
51        master_name: &str,
52        prefix: &str,
53        connection_name: Option<&str>,
54    ) -> anyhow::Result<Self> {
55        info!(
56            sentinel_urls = ?sentinel_urls,
57            master_name = %master_name,
58            "Connecting to Redis via Sentinel"
59        );
60
61        let mut sentinel = Sentinel::build(sentinel_urls.to_vec())?;
62        let client = sentinel.async_master_for(master_name, None).await?;
63
64        let mut manager = client.get_connection_manager().await?;
65
66        if let Some(name) = connection_name {
67            redis::cmd("CLIENT")
68                .arg("SETNAME")
69                .arg(name)
70                .query_async::<()>(&mut manager)
71                .await?;
72        }
73
74        info!(master_name = %master_name, "Redis Sentinel connected to master");
75
76        Ok(Self {
77            manager,
78            prefix: prefix.to_owned(),
79        })
80    }
81
82    /// Creates a namespaced key by prefixing with the cache prefix.
83    ///
84    /// Uses a stack buffer for small keys (<=96 bytes) to avoid heap allocation
85    /// during key construction. Returns `Bytes` which is cheap to clone into
86    /// async blocks (just an atomic ref count increment after the initial copy).
87    #[inline]
88    fn namespaced(&self, key: &[u8]) -> Bytes {
89        namespaced(&self.prefix, key)
90    }
91
92    /// Scans all keys matching the cache prefix.
93    /// Returns keys with the prefix stripped.
94    pub async fn scan_keys(&self) -> anyhow::Result<Vec<String>> {
95        let mut conn = self.manager.clone();
96        let pattern = format!("{}*", self.prefix);
97        let mut keys = Vec::new();
98        let mut cursor: i64 = 0;
99
100        info!(target: "mx-cache", pattern = %pattern, "starting Redis SCAN");
101
102        loop {
103            let (new_cursor, batch): (i64, Vec<String>) = redis::cmd("SCAN")
104                .arg(cursor)
105                .arg("MATCH")
106                .arg(&pattern)
107                .arg("COUNT")
108                .arg(1000)
109                .query_async(&mut conn)
110                .await?;
111
112            let batch_len = batch.len();
113            for key in batch {
114                if let Some(stripped) = key.strip_prefix(&self.prefix) {
115                    keys.push(stripped.to_owned());
116                }
117            }
118
119            cursor = new_cursor;
120            debug!(target: "mx-cache", cursor = cursor, batch_size = batch_len, total = keys.len(), "SCAN iteration");
121            if cursor == 0 {
122                break;
123            }
124        }
125
126        info!(target: "mx-cache", total_found = keys.len(), "Redis SCAN complete");
127
128        Ok(keys)
129    }
130}
131
132impl std::fmt::Debug for RedisCache {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        f.debug_struct("RedisCache")
135            .field("prefix", &self.prefix)
136            .finish()
137    }
138}
139
140impl CacheTrait for RedisCache {
141    fn set_nx_px(
142        &self,
143        key: &[u8],
144        value: &[u8],
145        ttl: Duration,
146    ) -> impl Future<Output = anyhow::Result<bool>> + Send {
147        let ttl_ms = ttl.as_millis() as u64;
148        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
149        let ns = self.namespaced(key);
150        let value = Bytes::copy_from_slice(value);
151        let mut manager = self.manager.clone();
152
153        async move {
154            let result: Option<String> = redis::cmd("SET")
155                .arg(ns.as_ref())
156                .arg(value.as_ref())
157                .arg("PX")
158                .arg(ttl_ms)
159                .arg("NX")
160                .query_async(&mut manager)
161                .await?;
162            Ok(result.is_some())
163        }
164    }
165
166    fn set(
167        &self,
168        key: &[u8],
169        value: &[u8],
170        ttl: Duration,
171    ) -> impl Future<Output = anyhow::Result<()>> + Send {
172        let ttl_ms = ttl.as_millis() as u64;
173        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
174        let ns = self.namespaced(key);
175        let value = Bytes::copy_from_slice(value);
176        let mut manager = self.manager.clone();
177
178        async move {
179            let _: () = redis::cmd("SET")
180                .arg(ns.as_ref())
181                .arg(value.as_ref())
182                .arg("PX")
183                .arg(ttl_ms)
184                .query_async(&mut manager)
185                .await?;
186            Ok(())
187        }
188    }
189
190    fn get(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<Option<Vec<u8>>>> + Send {
191        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
192        let ns = self.namespaced(key);
193        let mut manager = self.manager.clone();
194
195        async move {
196            let result: Option<Vec<u8>> = redis::cmd("GET")
197                .arg(ns.as_ref())
198                .query_async(&mut manager)
199                .await?;
200            Ok(result)
201        }
202    }
203
204    fn del(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<()>> + Send {
205        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
206        let ns = self.namespaced(key);
207        let mut manager = self.manager.clone();
208
209        async move {
210            let _: () = redis::cmd("DEL")
211                .arg(ns.as_ref())
212                .query_async(&mut manager)
213                .await?;
214            Ok(())
215        }
216    }
217}