1use std::time::Duration;
2
3use bytes::Bytes;
4use redis::Client;
5use redis::sentinel::Sentinel;
6use tracing::{debug, info};
7
8use crate::Cache as CacheTrait;
9use crate::utils::namespaced;
10
11#[derive(Clone)]
12pub struct RedisCache {
13 manager: redis::aio::ConnectionManager,
14 prefix: String,
15}
16
17impl RedisCache {
18 pub async fn connect(url: &str, prefix: &str) -> anyhow::Result<Self> {
19 Self::connect_with_name(url, prefix, None).await
20 }
21
22 pub async fn connect_with_name(
23 url: &str,
24 prefix: &str,
25 connection_name: Option<&str>,
26 ) -> anyhow::Result<Self> {
27 let client = Client::open(url)?;
28 let mut manager = client.get_connection_manager().await?;
29
30 if let Some(name) = connection_name {
32 redis::cmd("CLIENT")
33 .arg("SETNAME")
34 .arg(name)
35 .query_async::<()>(&mut manager)
36 .await?;
37 }
38
39 Ok(Self {
40 manager,
41 prefix: prefix.to_owned(),
42 })
43 }
44
45 pub async fn connect_sentinel(
50 sentinel_urls: &[&str],
51 master_name: &str,
52 prefix: &str,
53 connection_name: Option<&str>,
54 ) -> anyhow::Result<Self> {
55 info!(
56 sentinel_urls = ?sentinel_urls,
57 master_name = %master_name,
58 "Connecting to Redis via Sentinel"
59 );
60
61 let mut sentinel = Sentinel::build(sentinel_urls.to_vec())?;
62 let client = sentinel.async_master_for(master_name, None).await?;
63
64 let mut manager = client.get_connection_manager().await?;
65
66 if let Some(name) = connection_name {
67 redis::cmd("CLIENT")
68 .arg("SETNAME")
69 .arg(name)
70 .query_async::<()>(&mut manager)
71 .await?;
72 }
73
74 info!(master_name = %master_name, "Redis Sentinel connected to master");
75
76 Ok(Self {
77 manager,
78 prefix: prefix.to_owned(),
79 })
80 }
81
82 #[inline]
88 fn namespaced(&self, key: &[u8]) -> Bytes {
89 namespaced(&self.prefix, key)
90 }
91
92 pub async fn scan_keys(&self) -> anyhow::Result<Vec<String>> {
95 let mut conn = self.manager.clone();
96 let pattern = format!("{}*", self.prefix);
97 let mut keys = Vec::new();
98 let mut cursor: i64 = 0;
99
100 info!(target: "mx-cache", pattern = %pattern, "starting Redis SCAN");
101
102 loop {
103 let (new_cursor, batch): (i64, Vec<String>) = redis::cmd("SCAN")
104 .arg(cursor)
105 .arg("MATCH")
106 .arg(&pattern)
107 .arg("COUNT")
108 .arg(1000)
109 .query_async(&mut conn)
110 .await?;
111
112 let batch_len = batch.len();
113 for key in batch {
114 if let Some(stripped) = key.strip_prefix(&self.prefix) {
115 keys.push(stripped.to_owned());
116 }
117 }
118
119 cursor = new_cursor;
120 debug!(target: "mx-cache", cursor = cursor, batch_size = batch_len, total = keys.len(), "SCAN iteration");
121 if cursor == 0 {
122 break;
123 }
124 }
125
126 info!(target: "mx-cache", total_found = keys.len(), "Redis SCAN complete");
127
128 Ok(keys)
129 }
130}
131
132impl std::fmt::Debug for RedisCache {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.debug_struct("RedisCache")
135 .field("prefix", &self.prefix)
136 .finish()
137 }
138}
139
140impl CacheTrait for RedisCache {
141 fn set_nx_px(
142 &self,
143 key: &[u8],
144 value: &[u8],
145 ttl: Duration,
146 ) -> impl Future<Output = anyhow::Result<bool>> + Send {
147 let ttl_ms = ttl.as_millis() as u64;
148 let ns = self.namespaced(key);
150 let value = Bytes::copy_from_slice(value);
151 let mut manager = self.manager.clone();
152
153 async move {
154 let result: Option<String> = redis::cmd("SET")
155 .arg(ns.as_ref())
156 .arg(value.as_ref())
157 .arg("PX")
158 .arg(ttl_ms)
159 .arg("NX")
160 .query_async(&mut manager)
161 .await?;
162 Ok(result.is_some())
163 }
164 }
165
166 fn set(
167 &self,
168 key: &[u8],
169 value: &[u8],
170 ttl: Duration,
171 ) -> impl Future<Output = anyhow::Result<()>> + Send {
172 let ttl_ms = ttl.as_millis() as u64;
173 let ns = self.namespaced(key);
175 let value = Bytes::copy_from_slice(value);
176 let mut manager = self.manager.clone();
177
178 async move {
179 let _: () = redis::cmd("SET")
180 .arg(ns.as_ref())
181 .arg(value.as_ref())
182 .arg("PX")
183 .arg(ttl_ms)
184 .query_async(&mut manager)
185 .await?;
186 Ok(())
187 }
188 }
189
190 fn get(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<Option<Vec<u8>>>> + Send {
191 let ns = self.namespaced(key);
193 let mut manager = self.manager.clone();
194
195 async move {
196 let result: Option<Vec<u8>> = redis::cmd("GET")
197 .arg(ns.as_ref())
198 .query_async(&mut manager)
199 .await?;
200 Ok(result)
201 }
202 }
203
204 fn del(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<()>> + Send {
205 let ns = self.namespaced(key);
207 let mut manager = self.manager.clone();
208
209 async move {
210 let _: () = redis::cmd("DEL")
211 .arg(ns.as_ref())
212 .query_async(&mut manager)
213 .await?;
214 Ok(())
215 }
216 }
217}