polars_redis/types/hash/
reader.rs

1//! Hash reading operations for Redis.
2//!
3//! This module provides functionality for reading Redis hashes in batches,
4//! with support for projection pushdown (fetching only specific fields).
5//!
6//! # Connection Support
7//!
8//! The module supports both single-node Redis and Redis Cluster:
9//! - Single-node: Use functions with `ConnectionManager`
10//! - Cluster: Use functions with `ClusterConnection` (with `cluster` feature)
11//!
12//! For a unified interface, use the `RedisConn` enum with cluster-aware functions.
13
14use std::collections::HashMap;
15
16use redis::aio::ConnectionManager;
17#[cfg(feature = "cluster")]
18use redis::cluster_async::ClusterConnection;
19
20use crate::error::Result;
21use crate::parallel::ParallelFetch;
22
23/// Result of fetching a single hash from Redis.
24#[derive(Debug, Clone)]
25pub struct HashData {
26    /// The Redis key.
27    pub key: String,
28    /// Field-value pairs from the hash.
29    pub fields: HashMap<String, Option<String>>,
30    /// TTL in seconds (-1 = no expiry, -2 = key doesn't exist, None = TTL not requested).
31    pub ttl: Option<i64>,
32}
33
34/// Fetch all fields from multiple hashes using HGETALL.
35///
36/// Uses pipelining for efficiency. Returns data in the same order as the input keys.
37/// If a key doesn't exist or isn't a hash, returns an empty HashMap for that key.
38pub async fn fetch_hashes_all(
39    conn: &mut ConnectionManager,
40    keys: &[String],
41    include_ttl: bool,
42) -> Result<Vec<HashData>> {
43    if keys.is_empty() {
44        return Ok(Vec::new());
45    }
46
47    let mut pipe = redis::pipe();
48    for key in keys {
49        pipe.hgetall(key);
50    }
51
52    let results: Vec<HashMap<String, String>> = pipe.query_async(conn).await?;
53
54    // Optionally fetch TTLs
55    let ttls = if include_ttl {
56        fetch_ttls(conn, keys).await?
57    } else {
58        vec![None; keys.len()]
59    };
60
61    Ok(keys
62        .iter()
63        .zip(results)
64        .zip(ttls)
65        .map(|((key, fields), ttl)| HashData {
66            key: key.clone(),
67            fields: fields.into_iter().map(|(k, v)| (k, Some(v))).collect(),
68            ttl,
69        })
70        .collect())
71}
72
73/// Fetch specific fields from multiple hashes using HMGET.
74///
75/// This enables projection pushdown - only fetching the fields we need.
76/// Uses pipelining for efficiency. Returns data in the same order as the input keys.
77/// Missing fields are represented as None.
78pub async fn fetch_hashes_fields(
79    conn: &mut ConnectionManager,
80    keys: &[String],
81    fields: &[String],
82    include_ttl: bool,
83) -> Result<Vec<HashData>> {
84    if keys.is_empty() || fields.is_empty() {
85        return Ok(Vec::new());
86    }
87
88    let mut pipe = redis::pipe();
89    for key in keys {
90        pipe.cmd("HMGET").arg(key).arg(fields);
91    }
92
93    // HMGET returns Vec<Option<String>> for each key
94    let results: Vec<Vec<Option<String>>> = pipe.query_async(conn).await?;
95
96    // Optionally fetch TTLs
97    let ttls = if include_ttl {
98        fetch_ttls(conn, keys).await?
99    } else {
100        vec![None; keys.len()]
101    };
102
103    Ok(keys
104        .iter()
105        .zip(results)
106        .zip(ttls)
107        .map(|((key, values), ttl)| {
108            let field_map: HashMap<String, Option<String>> = fields
109                .iter()
110                .zip(values)
111                .map(|(field, value)| (field.clone(), value))
112                .collect();
113
114            HashData {
115                key: key.clone(),
116                fields: field_map,
117                ttl,
118            }
119        })
120        .collect())
121}
122
123/// Fetch TTLs for multiple keys using pipelining.
124///
125/// Returns a vector of Option<i64> where:
126/// - Some(ttl) where ttl >= 0: key has TTL in seconds
127/// - Some(-1): key exists but has no expiry
128/// - Some(-2): key doesn't exist
129async fn fetch_ttls(conn: &mut ConnectionManager, keys: &[String]) -> Result<Vec<Option<i64>>> {
130    if keys.is_empty() {
131        return Ok(Vec::new());
132    }
133
134    let mut pipe = redis::pipe();
135    for key in keys {
136        pipe.cmd("TTL").arg(key);
137    }
138
139    let results: Vec<i64> = pipe.query_async(conn).await?;
140    Ok(results.into_iter().map(Some).collect())
141}
142
143/// Fetch hash data with optional projection pushdown.
144///
145/// If `fields` is Some, uses HMGET to fetch only those fields.
146/// If `fields` is None, uses HGETALL to fetch all fields.
147pub async fn fetch_hashes(
148    conn: &mut ConnectionManager,
149    keys: &[String],
150    fields: Option<&[String]>,
151    include_ttl: bool,
152) -> Result<Vec<HashData>> {
153    match fields {
154        Some(f) => fetch_hashes_fields(conn, keys, f, include_ttl).await,
155        None => fetch_hashes_all(conn, keys, include_ttl).await,
156    }
157}
158
159// ============================================================================
160// Cluster support (with cluster feature)
161// ============================================================================
162
163/// Fetch all fields from multiple hashes using HGETALL on a cluster.
164///
165/// Uses pipelining for efficiency. The ClusterConnection automatically routes
166/// each command to the correct node based on key hash slots.
167#[cfg(feature = "cluster")]
168pub async fn fetch_hashes_all_cluster(
169    conn: &mut ClusterConnection,
170    keys: &[String],
171    include_ttl: bool,
172) -> Result<Vec<HashData>> {
173    if keys.is_empty() {
174        return Ok(Vec::new());
175    }
176
177    let mut pipe = redis::pipe();
178    for key in keys {
179        pipe.hgetall(key);
180    }
181
182    let results: Vec<HashMap<String, String>> = pipe.query_async(conn).await?;
183
184    // Optionally fetch TTLs
185    let ttls = if include_ttl {
186        fetch_ttls_cluster(conn, keys).await?
187    } else {
188        vec![None; keys.len()]
189    };
190
191    Ok(keys
192        .iter()
193        .zip(results)
194        .zip(ttls)
195        .map(|((key, fields), ttl)| HashData {
196            key: key.clone(),
197            fields: fields.into_iter().map(|(k, v)| (k, Some(v))).collect(),
198            ttl,
199        })
200        .collect())
201}
202
203/// Fetch specific fields from multiple hashes using HMGET on a cluster.
204#[cfg(feature = "cluster")]
205pub async fn fetch_hashes_fields_cluster(
206    conn: &mut ClusterConnection,
207    keys: &[String],
208    fields: &[String],
209    include_ttl: bool,
210) -> Result<Vec<HashData>> {
211    if keys.is_empty() || fields.is_empty() {
212        return Ok(Vec::new());
213    }
214
215    let mut pipe = redis::pipe();
216    for key in keys {
217        pipe.cmd("HMGET").arg(key).arg(fields);
218    }
219
220    let results: Vec<Vec<Option<String>>> = pipe.query_async(conn).await?;
221
222    let ttls = if include_ttl {
223        fetch_ttls_cluster(conn, keys).await?
224    } else {
225        vec![None; keys.len()]
226    };
227
228    Ok(keys
229        .iter()
230        .zip(results)
231        .zip(ttls)
232        .map(|((key, values), ttl)| {
233            let field_map: HashMap<String, Option<String>> = fields
234                .iter()
235                .zip(values)
236                .map(|(field, value)| (field.clone(), value))
237                .collect();
238
239            HashData {
240                key: key.clone(),
241                fields: field_map,
242                ttl,
243            }
244        })
245        .collect())
246}
247
248/// Fetch TTLs for multiple keys on a cluster.
249#[cfg(feature = "cluster")]
250async fn fetch_ttls_cluster(
251    conn: &mut ClusterConnection,
252    keys: &[String],
253) -> Result<Vec<Option<i64>>> {
254    if keys.is_empty() {
255        return Ok(Vec::new());
256    }
257
258    let mut pipe = redis::pipe();
259    for key in keys {
260        pipe.cmd("TTL").arg(key);
261    }
262
263    let results: Vec<i64> = pipe.query_async(conn).await?;
264    Ok(results.into_iter().map(Some).collect())
265}
266
267/// Fetch hash data from a cluster with optional projection pushdown.
268#[cfg(feature = "cluster")]
269pub async fn fetch_hashes_cluster(
270    conn: &mut ClusterConnection,
271    keys: &[String],
272    fields: Option<&[String]>,
273    include_ttl: bool,
274) -> Result<Vec<HashData>> {
275    match fields {
276        Some(f) => fetch_hashes_fields_cluster(conn, keys, f, include_ttl).await,
277        None => fetch_hashes_all_cluster(conn, keys, include_ttl).await,
278    }
279}
280
281// ============================================================================
282// Parallel fetch implementation
283// ============================================================================
284
285/// Fetcher for parallel hash operations.
286///
287/// Implements the `ParallelFetch` trait to enable parallel batch fetching
288/// of Redis hashes across multiple worker tasks.
289///
290/// # Example
291///
292/// ```ignore
293/// use polars_redis::parallel::{ParallelFetcher, ParallelConfig};
294/// use polars_redis::types::hash::reader::HashFetcher;
295///
296/// let fetcher = HashFetcher::new(None, false); // All fields, no TTL
297/// let mut parallel = ParallelFetcher::new(conn, fetcher, ParallelConfig::new(4));
298/// parallel.start();
299/// parallel.submit(keys).await?;
300/// ```
301#[derive(Debug, Clone)]
302pub struct HashFetcher {
303    /// Fields to fetch (None = all fields via HGETALL).
304    fields: Option<Vec<String>>,
305    /// Whether to include TTL information.
306    include_ttl: bool,
307}
308
309impl HashFetcher {
310    /// Create a new hash fetcher.
311    ///
312    /// # Arguments
313    ///
314    /// * `fields` - Specific fields to fetch, or None for all fields
315    /// * `include_ttl` - Whether to fetch TTL for each key
316    pub fn new(fields: Option<Vec<String>>, include_ttl: bool) -> Self {
317        Self {
318            fields,
319            include_ttl,
320        }
321    }
322
323    /// Create a fetcher for all fields.
324    pub fn all_fields() -> Self {
325        Self::new(None, false)
326    }
327
328    /// Create a fetcher with projection.
329    pub fn with_fields(fields: Vec<String>) -> Self {
330        Self::new(Some(fields), false)
331    }
332
333    /// Enable TTL fetching.
334    pub fn with_ttl(mut self) -> Self {
335        self.include_ttl = true;
336        self
337    }
338}
339
340impl ParallelFetch for HashFetcher {
341    type Output = HashData;
342
343    async fn fetch(
344        &self,
345        mut conn: ConnectionManager,
346        keys: Vec<String>,
347    ) -> Result<Vec<Self::Output>> {
348        fetch_hashes(&mut conn, &keys, self.fields.as_deref(), self.include_ttl).await
349    }
350}
351
352/// Fetcher for parallel hash operations on a cluster.
353#[cfg(feature = "cluster")]
354#[derive(Debug, Clone)]
355pub struct ClusterHashFetcher {
356    /// Fields to fetch (None = all fields via HGETALL).
357    fields: Option<Vec<String>>,
358    /// Whether to include TTL information.
359    include_ttl: bool,
360}
361
362#[cfg(feature = "cluster")]
363impl ClusterHashFetcher {
364    /// Create a new cluster hash fetcher.
365    pub fn new(fields: Option<Vec<String>>, include_ttl: bool) -> Self {
366        Self {
367            fields,
368            include_ttl,
369        }
370    }
371
372    /// Create a fetcher for all fields.
373    pub fn all_fields() -> Self {
374        Self::new(None, false)
375    }
376
377    /// Create a fetcher with projection.
378    pub fn with_fields(fields: Vec<String>) -> Self {
379        Self::new(Some(fields), false)
380    }
381
382    /// Enable TTL fetching.
383    pub fn with_ttl(mut self) -> Self {
384        self.include_ttl = true;
385        self
386    }
387
388    /// Fetch hashes from the cluster.
389    pub async fn fetch(
390        &self,
391        conn: &mut ClusterConnection,
392        keys: Vec<String>,
393    ) -> Result<Vec<HashData>> {
394        fetch_hashes_cluster(conn, &keys, self.fields.as_deref(), self.include_ttl).await
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    #[test]
403    fn test_hash_data_creation() {
404        let mut fields = HashMap::new();
405        fields.insert("name".to_string(), Some("Alice".to_string()));
406        fields.insert("age".to_string(), Some("30".to_string()));
407
408        let data = HashData {
409            key: "user:1".to_string(),
410            fields,
411            ttl: None,
412        };
413
414        assert_eq!(data.key, "user:1");
415        assert_eq!(data.fields.get("name"), Some(&Some("Alice".to_string())));
416        assert_eq!(data.fields.get("age"), Some(&Some("30".to_string())));
417        assert_eq!(data.ttl, None);
418    }
419
420    #[test]
421    fn test_hash_data_with_missing_field() {
422        let mut fields = HashMap::new();
423        fields.insert("name".to_string(), Some("Alice".to_string()));
424        fields.insert("email".to_string(), None); // Missing field
425
426        let data = HashData {
427            key: "user:1".to_string(),
428            fields,
429            ttl: Some(3600), // 1 hour TTL
430        };
431
432        assert_eq!(data.fields.get("name"), Some(&Some("Alice".to_string())));
433        assert_eq!(data.fields.get("email"), Some(&None));
434        assert_eq!(data.ttl, Some(3600));
435    }
436
437    #[test]
438    fn test_hash_data_with_no_expiry() {
439        let data = HashData {
440            key: "user:1".to_string(),
441            fields: HashMap::new(),
442            ttl: Some(-1), // No expiry
443        };
444
445        assert_eq!(data.ttl, Some(-1));
446    }
447}