1use std::collections::HashMap;
15
16use redis::aio::ConnectionManager;
17#[cfg(feature = "cluster")]
18use redis::cluster_async::ClusterConnection;
19
20use crate::error::Result;
21use crate::parallel::ParallelFetch;
22
23#[derive(Debug, Clone)]
25pub struct HashData {
26 pub key: String,
28 pub fields: HashMap<String, Option<String>>,
30 pub ttl: Option<i64>,
32}
33
34pub async fn fetch_hashes_all(
39 conn: &mut ConnectionManager,
40 keys: &[String],
41 include_ttl: bool,
42) -> Result<Vec<HashData>> {
43 if keys.is_empty() {
44 return Ok(Vec::new());
45 }
46
47 let mut pipe = redis::pipe();
48 for key in keys {
49 pipe.hgetall(key);
50 }
51
52 let results: Vec<HashMap<String, String>> = pipe.query_async(conn).await?;
53
54 let ttls = if include_ttl {
56 fetch_ttls(conn, keys).await?
57 } else {
58 vec![None; keys.len()]
59 };
60
61 Ok(keys
62 .iter()
63 .zip(results)
64 .zip(ttls)
65 .map(|((key, fields), ttl)| HashData {
66 key: key.clone(),
67 fields: fields.into_iter().map(|(k, v)| (k, Some(v))).collect(),
68 ttl,
69 })
70 .collect())
71}
72
73pub async fn fetch_hashes_fields(
79 conn: &mut ConnectionManager,
80 keys: &[String],
81 fields: &[String],
82 include_ttl: bool,
83) -> Result<Vec<HashData>> {
84 if keys.is_empty() || fields.is_empty() {
85 return Ok(Vec::new());
86 }
87
88 let mut pipe = redis::pipe();
89 for key in keys {
90 pipe.cmd("HMGET").arg(key).arg(fields);
91 }
92
93 let results: Vec<Vec<Option<String>>> = pipe.query_async(conn).await?;
95
96 let ttls = if include_ttl {
98 fetch_ttls(conn, keys).await?
99 } else {
100 vec![None; keys.len()]
101 };
102
103 Ok(keys
104 .iter()
105 .zip(results)
106 .zip(ttls)
107 .map(|((key, values), ttl)| {
108 let field_map: HashMap<String, Option<String>> = fields
109 .iter()
110 .zip(values)
111 .map(|(field, value)| (field.clone(), value))
112 .collect();
113
114 HashData {
115 key: key.clone(),
116 fields: field_map,
117 ttl,
118 }
119 })
120 .collect())
121}
122
123async fn fetch_ttls(conn: &mut ConnectionManager, keys: &[String]) -> Result<Vec<Option<i64>>> {
130 if keys.is_empty() {
131 return Ok(Vec::new());
132 }
133
134 let mut pipe = redis::pipe();
135 for key in keys {
136 pipe.cmd("TTL").arg(key);
137 }
138
139 let results: Vec<i64> = pipe.query_async(conn).await?;
140 Ok(results.into_iter().map(Some).collect())
141}
142
143pub async fn fetch_hashes(
148 conn: &mut ConnectionManager,
149 keys: &[String],
150 fields: Option<&[String]>,
151 include_ttl: bool,
152) -> Result<Vec<HashData>> {
153 match fields {
154 Some(f) => fetch_hashes_fields(conn, keys, f, include_ttl).await,
155 None => fetch_hashes_all(conn, keys, include_ttl).await,
156 }
157}
158
159#[cfg(feature = "cluster")]
168pub async fn fetch_hashes_all_cluster(
169 conn: &mut ClusterConnection,
170 keys: &[String],
171 include_ttl: bool,
172) -> Result<Vec<HashData>> {
173 if keys.is_empty() {
174 return Ok(Vec::new());
175 }
176
177 let mut pipe = redis::pipe();
178 for key in keys {
179 pipe.hgetall(key);
180 }
181
182 let results: Vec<HashMap<String, String>> = pipe.query_async(conn).await?;
183
184 let ttls = if include_ttl {
186 fetch_ttls_cluster(conn, keys).await?
187 } else {
188 vec![None; keys.len()]
189 };
190
191 Ok(keys
192 .iter()
193 .zip(results)
194 .zip(ttls)
195 .map(|((key, fields), ttl)| HashData {
196 key: key.clone(),
197 fields: fields.into_iter().map(|(k, v)| (k, Some(v))).collect(),
198 ttl,
199 })
200 .collect())
201}
202
203#[cfg(feature = "cluster")]
205pub async fn fetch_hashes_fields_cluster(
206 conn: &mut ClusterConnection,
207 keys: &[String],
208 fields: &[String],
209 include_ttl: bool,
210) -> Result<Vec<HashData>> {
211 if keys.is_empty() || fields.is_empty() {
212 return Ok(Vec::new());
213 }
214
215 let mut pipe = redis::pipe();
216 for key in keys {
217 pipe.cmd("HMGET").arg(key).arg(fields);
218 }
219
220 let results: Vec<Vec<Option<String>>> = pipe.query_async(conn).await?;
221
222 let ttls = if include_ttl {
223 fetch_ttls_cluster(conn, keys).await?
224 } else {
225 vec![None; keys.len()]
226 };
227
228 Ok(keys
229 .iter()
230 .zip(results)
231 .zip(ttls)
232 .map(|((key, values), ttl)| {
233 let field_map: HashMap<String, Option<String>> = fields
234 .iter()
235 .zip(values)
236 .map(|(field, value)| (field.clone(), value))
237 .collect();
238
239 HashData {
240 key: key.clone(),
241 fields: field_map,
242 ttl,
243 }
244 })
245 .collect())
246}
247
248#[cfg(feature = "cluster")]
250async fn fetch_ttls_cluster(
251 conn: &mut ClusterConnection,
252 keys: &[String],
253) -> Result<Vec<Option<i64>>> {
254 if keys.is_empty() {
255 return Ok(Vec::new());
256 }
257
258 let mut pipe = redis::pipe();
259 for key in keys {
260 pipe.cmd("TTL").arg(key);
261 }
262
263 let results: Vec<i64> = pipe.query_async(conn).await?;
264 Ok(results.into_iter().map(Some).collect())
265}
266
267#[cfg(feature = "cluster")]
269pub async fn fetch_hashes_cluster(
270 conn: &mut ClusterConnection,
271 keys: &[String],
272 fields: Option<&[String]>,
273 include_ttl: bool,
274) -> Result<Vec<HashData>> {
275 match fields {
276 Some(f) => fetch_hashes_fields_cluster(conn, keys, f, include_ttl).await,
277 None => fetch_hashes_all_cluster(conn, keys, include_ttl).await,
278 }
279}
280
281#[derive(Debug, Clone)]
302pub struct HashFetcher {
303 fields: Option<Vec<String>>,
305 include_ttl: bool,
307}
308
309impl HashFetcher {
310 pub fn new(fields: Option<Vec<String>>, include_ttl: bool) -> Self {
317 Self {
318 fields,
319 include_ttl,
320 }
321 }
322
323 pub fn all_fields() -> Self {
325 Self::new(None, false)
326 }
327
328 pub fn with_fields(fields: Vec<String>) -> Self {
330 Self::new(Some(fields), false)
331 }
332
333 pub fn with_ttl(mut self) -> Self {
335 self.include_ttl = true;
336 self
337 }
338}
339
340impl ParallelFetch for HashFetcher {
341 type Output = HashData;
342
343 async fn fetch(
344 &self,
345 mut conn: ConnectionManager,
346 keys: Vec<String>,
347 ) -> Result<Vec<Self::Output>> {
348 fetch_hashes(&mut conn, &keys, self.fields.as_deref(), self.include_ttl).await
349 }
350}
351
352#[cfg(feature = "cluster")]
354#[derive(Debug, Clone)]
355pub struct ClusterHashFetcher {
356 fields: Option<Vec<String>>,
358 include_ttl: bool,
360}
361
362#[cfg(feature = "cluster")]
363impl ClusterHashFetcher {
364 pub fn new(fields: Option<Vec<String>>, include_ttl: bool) -> Self {
366 Self {
367 fields,
368 include_ttl,
369 }
370 }
371
372 pub fn all_fields() -> Self {
374 Self::new(None, false)
375 }
376
377 pub fn with_fields(fields: Vec<String>) -> Self {
379 Self::new(Some(fields), false)
380 }
381
382 pub fn with_ttl(mut self) -> Self {
384 self.include_ttl = true;
385 self
386 }
387
388 pub async fn fetch(
390 &self,
391 conn: &mut ClusterConnection,
392 keys: Vec<String>,
393 ) -> Result<Vec<HashData>> {
394 fetch_hashes_cluster(conn, &keys, self.fields.as_deref(), self.include_ttl).await
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 #[test]
403 fn test_hash_data_creation() {
404 let mut fields = HashMap::new();
405 fields.insert("name".to_string(), Some("Alice".to_string()));
406 fields.insert("age".to_string(), Some("30".to_string()));
407
408 let data = HashData {
409 key: "user:1".to_string(),
410 fields,
411 ttl: None,
412 };
413
414 assert_eq!(data.key, "user:1");
415 assert_eq!(data.fields.get("name"), Some(&Some("Alice".to_string())));
416 assert_eq!(data.fields.get("age"), Some(&Some("30".to_string())));
417 assert_eq!(data.ttl, None);
418 }
419
420 #[test]
421 fn test_hash_data_with_missing_field() {
422 let mut fields = HashMap::new();
423 fields.insert("name".to_string(), Some("Alice".to_string()));
424 fields.insert("email".to_string(), None); let data = HashData {
427 key: "user:1".to_string(),
428 fields,
429 ttl: Some(3600), };
431
432 assert_eq!(data.fields.get("name"), Some(&Some("Alice".to_string())));
433 assert_eq!(data.fields.get("email"), Some(&None));
434 assert_eq!(data.ttl, Some(3600));
435 }
436
437 #[test]
438 fn test_hash_data_with_no_expiry() {
439 let data = HashData {
440 key: "user:1".to_string(),
441 fields: HashMap::new(),
442 ttl: Some(-1), };
444
445 assert_eq!(data.ttl, Some(-1));
446 }
447}