polars_redis/
lib.rs

1//! # polars-redis
2//!
3//! Query Redis like a database. Transform with Polars. Write back without ETL.
4//!
5//! This crate provides a Redis IO plugin for [Polars](https://pola.rs/), enabling you to
6//! scan Redis data structures as Arrow RecordBatches with support for projection pushdown
7//! and batched iteration.
8//!
9//! ## Supported Redis Types
10//!
11//! | Type | Read | Write | Description |
12//! |------|------|-------|-------------|
13//! | Hash | Yes | Yes | Field-level projection pushdown |
14//! | JSON | Yes | Yes | RedisJSON documents |
15//! | String | Yes | Yes | Simple key-value pairs |
16//! | Set | Yes | Yes | Unique members |
17//! | List | Yes | Yes | Ordered elements |
18//! | Sorted Set | Yes | Yes | Members with scores |
19//! | Stream | Yes | No | Timestamped entries |
20//! | TimeSeries | Yes | No | Server-side aggregation |
21//!
22//! ## Quick Start
23//!
24//! ### Reading Hashes
25//!
26//! ```no_run
27//! use polars_redis::{HashBatchIterator, HashSchema, BatchConfig, RedisType};
28//!
29//! // Define schema for hash fields
30//! let schema = HashSchema::new(vec![
31//!     ("name".to_string(), RedisType::Utf8),
32//!     ("age".to_string(), RedisType::Int64),
33//!     ("active".to_string(), RedisType::Boolean),
34//! ])
35//! .with_key(true)
36//! .with_key_column_name("_key".to_string());
37//!
38//! // Configure batch iteration
39//! let config = BatchConfig::new("user:*".to_string())
40//!     .with_batch_size(1000)
41//!     .with_count_hint(100);
42//!
43//! // Create iterator
44//! let mut iterator = HashBatchIterator::new(
45//!     "redis://localhost:6379",
46//!     schema,
47//!     config,
48//!     None, // projection
49//! ).unwrap();
50//!
51//! // Iterate over batches
52//! while let Some(batch) = iterator.next_batch().unwrap() {
53//!     println!("Got {} rows", batch.num_rows());
54//! }
55//! ```
56//!
57//! ### Writing Hashes
58//!
59//! ```no_run
60//! use polars_redis::{write_hashes, WriteMode};
61//!
62//! let keys = vec!["user:1".to_string(), "user:2".to_string()];
63//! let fields = vec!["name".to_string(), "age".to_string()];
64//! let values = vec![
65//!     vec![Some("Alice".to_string()), Some("30".to_string())],
66//!     vec![Some("Bob".to_string()), Some("25".to_string())],
67//! ];
68//!
69//! let result = write_hashes(
70//!     "redis://localhost:6379",
71//!     keys,
72//!     fields,
73//!     values,
74//!     Some(3600), // TTL in seconds
75//!     WriteMode::Replace,
76//! ).unwrap();
77//!
78//! println!("Wrote {} keys", result.keys_written);
79//! ```
80//!
81//! ### Schema Inference
82//!
83//! ```no_run
84//! use polars_redis::infer_hash_schema;
85//!
86//! // Sample keys to infer schema
87//! let schema = infer_hash_schema(
88//!     "redis://localhost:6379",
89//!     "user:*",
90//!     100,  // sample size
91//!     true, // type inference
92//! ).unwrap();
93//!
94//! for (name, dtype) in schema.fields {
95//!     println!("{}: {:?}", name, dtype);
96//! }
97//! ```
98//!
99//! ## Python Bindings
100//!
101//! This crate also provides Python bindings via PyO3 when built with the `python` feature.
102//! The Python package `polars-redis` wraps these bindings with a high-level API.
103//!
104//! ## Features
105//!
106//! - `python` - Enable Python bindings (PyO3)
107//! - `json` - Enable RedisJSON support (enabled by default)
108//! - `search` - Enable RediSearch support (enabled by default)
109//! - `cluster` - Enable Redis Cluster support
110
111#[cfg(feature = "python")]
112use arrow::datatypes::DataType;
113#[cfg(feature = "python")]
114use pyo3::prelude::*;
115#[cfg(feature = "python")]
116use std::collections::HashMap;
117
118pub mod cache;
119#[cfg(feature = "cluster")]
120pub mod cluster;
121mod connection;
122mod error;
123mod infer;
124pub mod options;
125pub mod parallel;
126pub mod pubsub;
127#[cfg(feature = "search")]
128pub mod query_builder;
129mod scanner;
130mod schema;
131#[cfg(feature = "search")]
132pub mod search;
133mod types;
134mod write;
135
136#[cfg(feature = "cluster")]
137pub use cluster::{ClusterKeyScanner, DirectClusterKeyScanner};
138pub use connection::{ConnectionConfig, RedisConn, RedisConnection};
139pub use error::{Error, Result};
140pub use infer::{
141    FieldInferenceInfo, InferredSchema, InferredSchemaWithConfidence, infer_hash_schema,
142    infer_hash_schema_with_confidence, infer_json_schema,
143};
144pub use options::{
145    HashScanOptions, JsonScanOptions, KeyColumn, ParallelStrategy, RowIndex, RowIndexColumn,
146    ScanOptions, StreamScanOptions, StringScanOptions, TimeSeriesScanOptions, TtlColumn,
147    get_default_batch_size, get_default_count_hint, get_default_timeout_ms,
148};
149pub use parallel::{FetchResult, KeyBatch, ParallelConfig, ParallelFetch};
150#[cfg(feature = "search")]
151pub use query_builder::{Predicate, PredicateBuilder, Value};
152pub use schema::{HashSchema, RedisType};
153pub use types::hash::{BatchConfig, HashBatchIterator, HashFetcher};
154#[cfg(feature = "cluster")]
155pub use types::hash::{ClusterHashBatchIterator, ClusterHashFetcher};
156#[cfg(feature = "search")]
157pub use types::hash::{HashSearchIterator, SearchBatchConfig};
158#[cfg(feature = "cluster")]
159pub use types::json::ClusterJsonBatchIterator;
160pub use types::json::{JsonBatchIterator, JsonSchema};
161#[cfg(feature = "cluster")]
162pub use types::list::ClusterListBatchIterator;
163pub use types::list::{ListBatchIterator, ListSchema};
164#[cfg(feature = "cluster")]
165pub use types::set::ClusterSetBatchIterator;
166pub use types::set::{SetBatchIterator, SetSchema};
167#[cfg(feature = "cluster")]
168pub use types::stream::ClusterStreamBatchIterator;
169pub use types::stream::{StreamBatchIterator, StreamSchema};
170#[cfg(feature = "cluster")]
171pub use types::string::ClusterStringBatchIterator;
172pub use types::string::{StringBatchIterator, StringSchema};
173#[cfg(feature = "cluster")]
174pub use types::timeseries::ClusterTimeSeriesBatchIterator;
175pub use types::timeseries::{TimeSeriesBatchIterator, TimeSeriesSchema};
176#[cfg(feature = "cluster")]
177pub use types::zset::ClusterZSetBatchIterator;
178pub use types::zset::{ZSetBatchIterator, ZSetSchema};
179pub use write::{
180    KeyError, WriteMode, WriteResult, WriteResultDetailed, write_hashes, write_hashes_detailed,
181    write_json, write_lists, write_sets, write_strings, write_zsets,
182};
183
184/// Serialize an Arrow RecordBatch to IPC format bytes.
185///
186/// This is useful for passing data to Python or other Arrow consumers.
187///
188/// # Example
189/// ```ignore
190/// let batch = iterator.next_batch()?;
191/// let ipc_bytes = polars_redis::batch_to_ipc(&batch)?;
192/// // Send ipc_bytes to Python, which can read it with pl.read_ipc()
193/// ```
194pub fn batch_to_ipc(batch: &arrow::array::RecordBatch) -> Result<Vec<u8>> {
195    let mut buf = Vec::new();
196    {
197        let mut writer = arrow::ipc::writer::FileWriter::try_new(&mut buf, batch.schema().as_ref())
198            .map_err(|e| Error::Runtime(format!("Failed to create IPC writer: {}", e)))?;
199
200        writer
201            .write(batch)
202            .map_err(|e| Error::Runtime(format!("Failed to write batch: {}", e)))?;
203
204        writer
205            .finish()
206            .map_err(|e| Error::Runtime(format!("Failed to finish IPC: {}", e)))?;
207    }
208    Ok(buf)
209}
210
211// ============================================================================
212// Python bindings (only when "python" feature is enabled)
213// ============================================================================
214
215#[cfg(feature = "python")]
216/// Python module definition for polars_redis._internal
217#[pymodule(name = "_internal")]
218fn polars_redis_internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
219    m.add_class::<RedisScanner>()?;
220    m.add_class::<PyHashBatchIterator>()?;
221    m.add_class::<PyJsonBatchIterator>()?;
222    m.add_class::<PyStringBatchIterator>()?;
223    m.add_class::<PySetBatchIterator>()?;
224    m.add_class::<PyListBatchIterator>()?;
225    m.add_class::<PyZSetBatchIterator>()?;
226    m.add_class::<PyStreamBatchIterator>()?;
227    m.add_class::<PyTimeSeriesBatchIterator>()?;
228    #[cfg(feature = "search")]
229    m.add_class::<PyHashSearchIterator>()?;
230    #[cfg(feature = "search")]
231    m.add_function(wrap_pyfunction!(py_aggregate, m)?)?;
232    m.add_function(wrap_pyfunction!(scan_keys, m)?)?;
233    m.add_function(wrap_pyfunction!(py_infer_hash_schema, m)?)?;
234    m.add_function(wrap_pyfunction!(py_infer_json_schema, m)?)?;
235    m.add_function(wrap_pyfunction!(py_infer_hash_schema_with_overwrite, m)?)?;
236    m.add_function(wrap_pyfunction!(py_infer_hash_schema_with_confidence, m)?)?;
237    m.add_function(wrap_pyfunction!(py_infer_json_schema_with_overwrite, m)?)?;
238    m.add_function(wrap_pyfunction!(py_write_hashes, m)?)?;
239    m.add_function(wrap_pyfunction!(py_write_hashes_detailed, m)?)?;
240    m.add_function(wrap_pyfunction!(py_write_json, m)?)?;
241    m.add_function(wrap_pyfunction!(py_write_strings, m)?)?;
242    m.add_function(wrap_pyfunction!(py_write_sets, m)?)?;
243    m.add_function(wrap_pyfunction!(py_write_lists, m)?)?;
244    m.add_function(wrap_pyfunction!(py_write_zsets, m)?)?;
245    m.add_function(wrap_pyfunction!(py_cache_set, m)?)?;
246    m.add_function(wrap_pyfunction!(py_cache_get, m)?)?;
247    m.add_function(wrap_pyfunction!(py_cache_delete, m)?)?;
248    m.add_function(wrap_pyfunction!(py_cache_exists, m)?)?;
249    m.add_function(wrap_pyfunction!(py_cache_ttl, m)?)?;
250    #[cfg(feature = "cluster")]
251    m.add_class::<PyClusterHashBatchIterator>()?;
252    #[cfg(feature = "cluster")]
253    m.add_class::<PyClusterJsonBatchIterator>()?;
254    #[cfg(feature = "cluster")]
255    m.add_class::<PyClusterStringBatchIterator>()?;
256    Ok(())
257}
258
259#[cfg(feature = "python")]
260/// Redis scanner that handles SCAN iteration and data fetching.
261#[pyclass]
262pub struct RedisScanner {
263    connection_url: String,
264    pattern: String,
265    batch_size: usize,
266    count_hint: usize,
267}
268
269#[cfg(feature = "python")]
270#[pymethods]
271impl RedisScanner {
272    /// Create a new RedisScanner.
273    #[new]
274    #[pyo3(signature = (connection_url, pattern, batch_size = 1000, count_hint = 100))]
275    fn new(connection_url: String, pattern: String, batch_size: usize, count_hint: usize) -> Self {
276        Self {
277            connection_url,
278            pattern,
279            batch_size,
280            count_hint,
281        }
282    }
283
284    #[getter]
285    fn connection_url(&self) -> &str {
286        &self.connection_url
287    }
288
289    #[getter]
290    fn pattern(&self) -> &str {
291        &self.pattern
292    }
293
294    #[getter]
295    fn batch_size(&self) -> usize {
296        self.batch_size
297    }
298
299    #[getter]
300    fn count_hint(&self) -> usize {
301        self.count_hint
302    }
303}
304
305#[cfg(feature = "python")]
306/// Python wrapper for HashBatchIterator.
307///
308/// This class is used by the Python IO plugin to iterate over Redis hash data
309/// and yield Arrow RecordBatches.
310#[pyclass]
311pub struct PyHashBatchIterator {
312    inner: HashBatchIterator,
313}
314
315#[cfg(feature = "python")]
316#[pymethods]
317impl PyHashBatchIterator {
318    /// Create a new PyHashBatchIterator.
319    ///
320    /// # Arguments
321    /// * `url` - Redis connection URL
322    /// * `pattern` - Key pattern to match
323    /// * `schema` - List of (field_name, type_name) tuples
324    /// * `batch_size` - Keys per batch
325    /// * `count_hint` - SCAN COUNT hint
326    /// * `projection` - Optional list of columns to fetch
327    /// * `include_key` - Whether to include the Redis key as a column
328    /// * `key_column_name` - Name of the key column
329    /// * `include_ttl` - Whether to include the TTL as a column
330    /// * `ttl_column_name` - Name of the TTL column
331    /// * `include_row_index` - Whether to include the row index as a column
332    /// * `row_index_column_name` - Name of the row index column
333    /// * `max_rows` - Optional maximum rows to return
334    /// * `parallel` - Optional number of parallel workers for fetching
335    #[new]
336    #[pyo3(signature = (
337        url,
338        pattern,
339        schema,
340        batch_size = 1000,
341        count_hint = 100,
342        projection = None,
343        include_key = true,
344        key_column_name = "_key".to_string(),
345        include_ttl = false,
346        ttl_column_name = "_ttl".to_string(),
347        include_row_index = false,
348        row_index_column_name = "_index".to_string(),
349        max_rows = None,
350        parallel = None
351    ))]
352    #[allow(clippy::too_many_arguments)]
353    fn new(
354        url: String,
355        pattern: String,
356        schema: Vec<(String, String)>,
357        batch_size: usize,
358        count_hint: usize,
359        projection: Option<Vec<String>>,
360        include_key: bool,
361        key_column_name: String,
362        include_ttl: bool,
363        ttl_column_name: String,
364        include_row_index: bool,
365        row_index_column_name: String,
366        max_rows: Option<usize>,
367        parallel: Option<usize>,
368    ) -> PyResult<Self> {
369        // Parse schema from Python types
370        let field_types: Vec<(String, RedisType)> = schema
371            .into_iter()
372            .map(|(name, type_str)| {
373                let redis_type = match type_str.to_lowercase().as_str() {
374                    "utf8" | "str" | "string" => RedisType::Utf8,
375                    "int64" | "int" | "integer" => RedisType::Int64,
376                    "float64" | "float" | "double" => RedisType::Float64,
377                    "bool" | "boolean" => RedisType::Boolean,
378                    _ => {
379                        return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
380                            "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
381                            type_str, name
382                        )));
383                    }
384                };
385                Ok((name, redis_type))
386            })
387            .collect::<PyResult<Vec<_>>>()?;
388
389        let hash_schema = HashSchema::new(field_types)
390            .with_key(include_key)
391            .with_key_column_name(key_column_name)
392            .with_ttl(include_ttl)
393            .with_ttl_column_name(ttl_column_name)
394            .with_row_index(include_row_index)
395            .with_row_index_column_name(row_index_column_name);
396
397        let mut config = BatchConfig::new(pattern)
398            .with_batch_size(batch_size)
399            .with_count_hint(count_hint);
400
401        if let Some(max) = max_rows {
402            config = config.with_max_rows(max);
403        }
404
405        if let Some(workers) = parallel {
406            config = config.with_parallel(ParallelStrategy::batches(workers));
407        }
408
409        let inner = HashBatchIterator::new(&url, hash_schema, config, projection)
410            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
411
412        Ok(Self { inner })
413    }
414
415    /// Get the next batch as Arrow IPC bytes.
416    ///
417    /// Returns None when iteration is complete.
418    /// Returns the RecordBatch serialized as Arrow IPC format.
419    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
420        let batch = self
421            .inner
422            .next_batch()
423            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
424
425        match batch {
426            Some(record_batch) => {
427                // Serialize to Arrow IPC format
428                let mut buf = Vec::new();
429                {
430                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
431                        &mut buf,
432                        record_batch.schema().as_ref(),
433                    )
434                    .map_err(|e| {
435                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
436                            "Failed to create IPC writer: {}",
437                            e
438                        ))
439                    })?;
440
441                    writer.write(&record_batch).map_err(|e| {
442                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
443                            "Failed to write batch: {}",
444                            e
445                        ))
446                    })?;
447
448                    writer.finish().map_err(|e| {
449                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
450                            "Failed to finish IPC: {}",
451                            e
452                        ))
453                    })?;
454                }
455
456                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
457            }
458            None => Ok(None),
459        }
460    }
461
462    /// Check if iteration is complete.
463    fn is_done(&self) -> bool {
464        self.inner.is_done()
465    }
466
467    /// Get the number of rows yielded so far.
468    fn rows_yielded(&self) -> usize {
469        self.inner.rows_yielded()
470    }
471}
472
473// ============================================================================
474// Cache functions for DataFrame caching
475// ============================================================================
476
477#[cfg(feature = "python")]
478/// Store bytes in Redis with optional TTL.
479///
480/// # Arguments
481/// * `url` - Redis connection URL
482/// * `key` - Redis key
483/// * `data` - Bytes to store
484/// * `ttl` - Optional TTL in seconds
485///
486/// # Returns
487/// Number of bytes written.
488#[pyfunction]
489#[pyo3(signature = (url, key, data, ttl = None))]
490fn py_cache_set(url: &str, key: &str, data: &[u8], ttl: Option<i64>) -> PyResult<usize> {
491    let rt = tokio::runtime::Runtime::new()
492        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
493
494    rt.block_on(async {
495        let client = redis::Client::open(url)
496            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
497
498        let mut conn = client
499            .get_multiplexed_async_connection()
500            .await
501            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
502
503        let len = data.len();
504
505        if let Some(seconds) = ttl {
506            redis::cmd("SETEX")
507                .arg(key)
508                .arg(seconds)
509                .arg(data)
510                .query_async::<()>(&mut conn)
511                .await
512                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
513        } else {
514            redis::cmd("SET")
515                .arg(key)
516                .arg(data)
517                .query_async::<()>(&mut conn)
518                .await
519                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
520        }
521
522        Ok(len)
523    })
524}
525
526#[cfg(feature = "python")]
527/// Retrieve bytes from Redis.
528///
529/// # Arguments
530/// * `url` - Redis connection URL
531/// * `key` - Redis key
532///
533/// # Returns
534/// Bytes if key exists, None otherwise.
535#[pyfunction]
536fn py_cache_get(py: Python<'_>, url: &str, key: &str) -> PyResult<Option<Py<PyAny>>> {
537    let rt = tokio::runtime::Runtime::new()
538        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
539
540    rt.block_on(async {
541        let client = redis::Client::open(url)
542            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
543
544        let mut conn = client
545            .get_multiplexed_async_connection()
546            .await
547            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
548
549        let result: Option<Vec<u8>> = redis::cmd("GET")
550            .arg(key)
551            .query_async(&mut conn)
552            .await
553            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
554
555        match result {
556            Some(data) => Ok(Some(pyo3::types::PyBytes::new(py, &data).into())),
557            None => Ok(None),
558        }
559    })
560}
561
562#[cfg(feature = "python")]
563/// Delete a key from Redis.
564///
565/// # Arguments
566/// * `url` - Redis connection URL
567/// * `key` - Redis key
568///
569/// # Returns
570/// True if key was deleted, False if it didn't exist.
571#[pyfunction]
572fn py_cache_delete(url: &str, key: &str) -> PyResult<bool> {
573    let rt = tokio::runtime::Runtime::new()
574        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
575
576    rt.block_on(async {
577        let client = redis::Client::open(url)
578            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
579
580        let mut conn = client
581            .get_multiplexed_async_connection()
582            .await
583            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
584
585        let deleted: i64 = redis::cmd("DEL")
586            .arg(key)
587            .query_async(&mut conn)
588            .await
589            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
590
591        Ok(deleted > 0)
592    })
593}
594
595#[cfg(feature = "python")]
596/// Check if a key exists in Redis.
597///
598/// # Arguments
599/// * `url` - Redis connection URL
600/// * `key` - Redis key
601///
602/// # Returns
603/// True if key exists, False otherwise.
604#[pyfunction]
605fn py_cache_exists(url: &str, key: &str) -> PyResult<bool> {
606    let rt = tokio::runtime::Runtime::new()
607        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
608
609    rt.block_on(async {
610        let client = redis::Client::open(url)
611            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
612
613        let mut conn = client
614            .get_multiplexed_async_connection()
615            .await
616            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
617
618        let exists: i64 = redis::cmd("EXISTS")
619            .arg(key)
620            .query_async(&mut conn)
621            .await
622            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
623
624        Ok(exists > 0)
625    })
626}
627
628#[cfg(feature = "python")]
629/// Get the TTL of a key in Redis.
630///
631/// # Arguments
632/// * `url` - Redis connection URL
633/// * `key` - Redis key
634///
635/// # Returns
636/// TTL in seconds, or None if key doesn't exist or has no TTL.
637#[pyfunction]
638fn py_cache_ttl(url: &str, key: &str) -> PyResult<Option<i64>> {
639    let rt = tokio::runtime::Runtime::new()
640        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
641
642    rt.block_on(async {
643        let client = redis::Client::open(url)
644            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
645
646        let mut conn = client
647            .get_multiplexed_async_connection()
648            .await
649            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
650
651        let ttl: i64 = redis::cmd("TTL")
652            .arg(key)
653            .query_async(&mut conn)
654            .await
655            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
656
657        // TTL returns -2 if key doesn't exist, -1 if no TTL
658        if ttl < 0 { Ok(None) } else { Ok(Some(ttl)) }
659    })
660}
661
662// ============================================================================
663// Cluster Python bindings (with cluster + python features)
664// ============================================================================
665
666#[cfg(all(feature = "python", feature = "cluster"))]
667/// Python wrapper for ClusterHashBatchIterator.
668///
669/// This class is used by the Python IO plugin to iterate over Redis Cluster hash data
670/// and yield Arrow RecordBatches.
671#[pyclass]
672pub struct PyClusterHashBatchIterator {
673    inner: ClusterHashBatchIterator,
674}
675
676#[cfg(all(feature = "python", feature = "cluster"))]
677#[pymethods]
678impl PyClusterHashBatchIterator {
679    /// Create a new PyClusterHashBatchIterator.
680    ///
681    /// # Arguments
682    /// * `nodes` - List of cluster node URLs
683    /// * `pattern` - Key pattern to match
684    /// * `schema` - List of (field_name, type_name) tuples
685    /// * `batch_size` - Keys per batch
686    /// * `count_hint` - SCAN COUNT hint
687    /// * `projection` - Optional list of columns to fetch
688    /// * `include_key` - Whether to include the Redis key as a column
689    /// * `key_column_name` - Name of the key column
690    /// * `include_ttl` - Whether to include the TTL as a column
691    /// * `ttl_column_name` - Name of the TTL column
692    /// * `include_row_index` - Whether to include the row index as a column
693    /// * `row_index_column_name` - Name of the row index column
694    /// * `max_rows` - Optional maximum rows to return
695    /// * `parallel` - Optional number of parallel workers for fetching
696    #[new]
697    #[pyo3(signature = (
698        nodes,
699        pattern,
700        schema,
701        batch_size = 1000,
702        count_hint = 100,
703        projection = None,
704        include_key = true,
705        key_column_name = "_key".to_string(),
706        include_ttl = false,
707        ttl_column_name = "_ttl".to_string(),
708        include_row_index = false,
709        row_index_column_name = "_index".to_string(),
710        max_rows = None,
711        parallel = None
712    ))]
713    #[allow(clippy::too_many_arguments)]
714    fn new(
715        nodes: Vec<String>,
716        pattern: String,
717        schema: Vec<(String, String)>,
718        batch_size: usize,
719        count_hint: usize,
720        projection: Option<Vec<String>>,
721        include_key: bool,
722        key_column_name: String,
723        include_ttl: bool,
724        ttl_column_name: String,
725        include_row_index: bool,
726        row_index_column_name: String,
727        max_rows: Option<usize>,
728        parallel: Option<usize>,
729    ) -> PyResult<Self> {
730        let field_types: Vec<(String, RedisType)> = schema
731            .into_iter()
732            .map(|(name, type_str)| {
733                let redis_type = match type_str.to_lowercase().as_str() {
734                    "utf8" | "str" | "string" => RedisType::Utf8,
735                    "int64" | "int" | "integer" => RedisType::Int64,
736                    "float64" | "float" | "double" => RedisType::Float64,
737                    "bool" | "boolean" => RedisType::Boolean,
738                    _ => {
739                        return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
740                            "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
741                            type_str, name
742                        )));
743                    }
744                };
745                Ok((name, redis_type))
746            })
747            .collect::<PyResult<Vec<_>>>()?;
748
749        let hash_schema = HashSchema::new(field_types)
750            .with_key(include_key)
751            .with_key_column_name(key_column_name)
752            .with_ttl(include_ttl)
753            .with_ttl_column_name(ttl_column_name)
754            .with_row_index(include_row_index)
755            .with_row_index_column_name(row_index_column_name);
756
757        let mut config = BatchConfig::new(pattern)
758            .with_batch_size(batch_size)
759            .with_count_hint(count_hint);
760
761        if let Some(max) = max_rows {
762            config = config.with_max_rows(max);
763        }
764
765        if let Some(workers) = parallel {
766            config = config.with_parallel(ParallelStrategy::batches(workers));
767        }
768
769        let inner = ClusterHashBatchIterator::new(&nodes, hash_schema, config, projection)
770            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
771
772        Ok(Self { inner })
773    }
774
775    /// Get the next batch as Arrow IPC bytes.
776    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
777        let batch = self
778            .inner
779            .next_batch()
780            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
781
782        match batch {
783            Some(record_batch) => {
784                let mut buf = Vec::new();
785                {
786                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
787                        &mut buf,
788                        record_batch.schema().as_ref(),
789                    )
790                    .map_err(|e| {
791                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
792                            "Failed to create IPC writer: {}",
793                            e
794                        ))
795                    })?;
796
797                    writer.write(&record_batch).map_err(|e| {
798                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
799                            "Failed to write batch: {}",
800                            e
801                        ))
802                    })?;
803
804                    writer.finish().map_err(|e| {
805                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
806                            "Failed to finish IPC: {}",
807                            e
808                        ))
809                    })?;
810                }
811
812                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
813            }
814            None => Ok(None),
815        }
816    }
817
818    fn is_done(&self) -> bool {
819        self.inner.is_done()
820    }
821
822    fn rows_yielded(&self) -> usize {
823        self.inner.rows_yielded()
824    }
825
826    fn node_count(&self) -> usize {
827        self.inner.node_count()
828    }
829}
830
831#[cfg(all(feature = "python", feature = "cluster"))]
832/// Python wrapper for ClusterJsonBatchIterator.
833#[pyclass]
834pub struct PyClusterJsonBatchIterator {
835    inner: ClusterJsonBatchIterator,
836}
837
838#[cfg(all(feature = "python", feature = "cluster"))]
839#[pymethods]
840impl PyClusterJsonBatchIterator {
841    #[new]
842    #[pyo3(signature = (
843        nodes,
844        pattern,
845        schema,
846        batch_size = 1000,
847        count_hint = 100,
848        projection = None,
849        include_key = true,
850        key_column_name = "_key".to_string(),
851        include_ttl = false,
852        ttl_column_name = "_ttl".to_string(),
853        include_row_index = false,
854        row_index_column_name = "_index".to_string(),
855        max_rows = None,
856        parallel = None
857    ))]
858    #[allow(clippy::too_many_arguments)]
859    fn new(
860        nodes: Vec<String>,
861        pattern: String,
862        schema: Vec<(String, String)>,
863        batch_size: usize,
864        count_hint: usize,
865        projection: Option<Vec<String>>,
866        include_key: bool,
867        key_column_name: String,
868        include_ttl: bool,
869        ttl_column_name: String,
870        include_row_index: bool,
871        row_index_column_name: String,
872        max_rows: Option<usize>,
873        parallel: Option<usize>,
874    ) -> PyResult<Self> {
875        let field_types: Vec<(String, DataType)> = schema
876            .into_iter()
877            .map(|(name, type_str)| {
878                let dtype = match type_str.to_lowercase().as_str() {
879                    "utf8" | "str" | "string" => DataType::Utf8,
880                    "int64" | "int" | "integer" => DataType::Int64,
881                    "float64" | "float" | "double" => DataType::Float64,
882                    "bool" | "boolean" => DataType::Boolean,
883                    _ => {
884                        return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
885                            "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
886                            type_str, name
887                        )));
888                    }
889                };
890                Ok((name, dtype))
891            })
892            .collect::<PyResult<Vec<_>>>()?;
893
894        let json_schema = JsonSchema::new(field_types)
895            .with_key(include_key)
896            .with_key_column_name(key_column_name)
897            .with_ttl(include_ttl)
898            .with_ttl_column_name(ttl_column_name)
899            .with_row_index(include_row_index)
900            .with_row_index_column_name(row_index_column_name);
901
902        let mut config = BatchConfig::new(pattern)
903            .with_batch_size(batch_size)
904            .with_count_hint(count_hint);
905
906        if let Some(max) = max_rows {
907            config = config.with_max_rows(max);
908        }
909
910        if let Some(workers) = parallel {
911            config = config.with_parallel(ParallelStrategy::batches(workers));
912        }
913
914        let inner = ClusterJsonBatchIterator::new(&nodes, json_schema, config, projection)
915            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
916
917        Ok(Self { inner })
918    }
919
920    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
921        let batch = self
922            .inner
923            .next_batch()
924            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
925
926        match batch {
927            Some(record_batch) => {
928                let mut buf = Vec::new();
929                {
930                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
931                        &mut buf,
932                        record_batch.schema().as_ref(),
933                    )
934                    .map_err(|e| {
935                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
936                            "Failed to create IPC writer: {}",
937                            e
938                        ))
939                    })?;
940
941                    writer.write(&record_batch).map_err(|e| {
942                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
943                            "Failed to write batch: {}",
944                            e
945                        ))
946                    })?;
947
948                    writer.finish().map_err(|e| {
949                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
950                            "Failed to finish IPC: {}",
951                            e
952                        ))
953                    })?;
954                }
955
956                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
957            }
958            None => Ok(None),
959        }
960    }
961
962    fn is_done(&self) -> bool {
963        self.inner.is_done()
964    }
965
966    fn rows_yielded(&self) -> usize {
967        self.inner.rows_yielded()
968    }
969
970    fn node_count(&self) -> usize {
971        self.inner.node_count()
972    }
973}
974
975#[cfg(all(feature = "python", feature = "cluster"))]
976/// Python wrapper for ClusterStringBatchIterator.
977#[pyclass]
978pub struct PyClusterStringBatchIterator {
979    inner: ClusterStringBatchIterator,
980}
981
982#[cfg(all(feature = "python", feature = "cluster"))]
983#[pymethods]
984impl PyClusterStringBatchIterator {
985    #[new]
986    #[pyo3(signature = (
987        nodes,
988        pattern,
989        value_type = "utf8".to_string(),
990        batch_size = 1000,
991        count_hint = 100,
992        include_key = true,
993        key_column_name = "_key".to_string(),
994        value_column_name = "value".to_string(),
995        include_ttl = false,
996        ttl_column_name = "_ttl".to_string(),
997        max_rows = None,
998        parallel = None
999    ))]
1000    #[allow(clippy::too_many_arguments)]
1001    fn new(
1002        nodes: Vec<String>,
1003        pattern: String,
1004        value_type: String,
1005        batch_size: usize,
1006        count_hint: usize,
1007        include_key: bool,
1008        key_column_name: String,
1009        value_column_name: String,
1010        include_ttl: bool,
1011        ttl_column_name: String,
1012        max_rows: Option<usize>,
1013        parallel: Option<usize>,
1014    ) -> PyResult<Self> {
1015        use arrow::datatypes::TimeUnit;
1016
1017        let dtype = match value_type.to_lowercase().as_str() {
1018            "utf8" | "str" | "string" => DataType::Utf8,
1019            "int64" | "int" | "integer" => DataType::Int64,
1020            "float64" | "float" | "double" => DataType::Float64,
1021            "bool" | "boolean" => DataType::Boolean,
1022            "date" => DataType::Date32,
1023            "datetime" => DataType::Timestamp(TimeUnit::Microsecond, None),
1024            _ => {
1025                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1026                    "Unknown value type '{}'. Supported: utf8, int64, float64, bool, date, datetime",
1027                    value_type
1028                )));
1029            }
1030        };
1031
1032        let string_schema = StringSchema::new(dtype)
1033            .with_key(include_key)
1034            .with_key_column_name(key_column_name)
1035            .with_value_column_name(value_column_name)
1036            .with_ttl(include_ttl)
1037            .with_ttl_column_name(ttl_column_name);
1038
1039        let mut config = BatchConfig::new(pattern)
1040            .with_batch_size(batch_size)
1041            .with_count_hint(count_hint);
1042
1043        if let Some(max) = max_rows {
1044            config = config.with_max_rows(max);
1045        }
1046
1047        if let Some(workers) = parallel {
1048            config = config.with_parallel(ParallelStrategy::batches(workers));
1049        }
1050
1051        let inner = ClusterStringBatchIterator::new(&nodes, string_schema, config)
1052            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1053
1054        Ok(Self { inner })
1055    }
1056
1057    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1058        let batch = self
1059            .inner
1060            .next_batch()
1061            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1062
1063        match batch {
1064            Some(record_batch) => {
1065                let mut buf = Vec::new();
1066                {
1067                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
1068                        &mut buf,
1069                        record_batch.schema().as_ref(),
1070                    )
1071                    .map_err(|e| {
1072                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1073                            "Failed to create IPC writer: {}",
1074                            e
1075                        ))
1076                    })?;
1077
1078                    writer.write(&record_batch).map_err(|e| {
1079                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1080                            "Failed to write batch: {}",
1081                            e
1082                        ))
1083                    })?;
1084
1085                    writer.finish().map_err(|e| {
1086                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1087                            "Failed to finish IPC: {}",
1088                            e
1089                        ))
1090                    })?;
1091                }
1092
1093                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1094            }
1095            None => Ok(None),
1096        }
1097    }
1098
1099    fn is_done(&self) -> bool {
1100        self.inner.is_done()
1101    }
1102
1103    fn rows_yielded(&self) -> usize {
1104        self.inner.rows_yielded()
1105    }
1106
1107    fn node_count(&self) -> usize {
1108        self.inner.node_count()
1109    }
1110}
1111
1112#[cfg(all(feature = "python", feature = "search"))]
1113/// Python wrapper for HashSearchIterator.
1114///
1115/// This class is used to iterate over RediSearch `FT.SEARCH` results
1116/// and yield Arrow RecordBatches, enabling server-side filtering.
1117#[pyclass]
1118pub struct PyHashSearchIterator {
1119    inner: HashSearchIterator,
1120}
1121
1122#[cfg(all(feature = "python", feature = "search"))]
1123#[pymethods]
1124impl PyHashSearchIterator {
1125    /// Create a new PyHashSearchIterator.
1126    ///
1127    /// # Arguments
1128    /// * `url` - Redis connection URL
1129    /// * `index` - RediSearch index name
1130    /// * `query` - RediSearch query string (e.g., "@age:[30 +inf]", "*" for all)
1131    /// * `schema` - List of (field_name, type_name) tuples
1132    /// * `batch_size` - Documents per batch
1133    /// * `projection` - Optional list of columns to fetch
1134    /// * `include_key` - Whether to include the Redis key as a column
1135    /// * `key_column_name` - Name of the key column
1136    /// * `include_ttl` - Whether to include the TTL as a column
1137    /// * `ttl_column_name` - Name of the TTL column
1138    /// * `include_row_index` - Whether to include the row index as a column
1139    /// * `row_index_column_name` - Name of the row index column
1140    /// * `max_rows` - Optional maximum rows to return
1141    /// * `sort_by` - Optional field to sort by
1142    /// * `sort_ascending` - Sort direction (default: true)
1143    #[new]
1144    #[pyo3(signature = (
1145        url,
1146        index,
1147        query,
1148        schema,
1149        batch_size = 1000,
1150        projection = None,
1151        include_key = true,
1152        key_column_name = "_key".to_string(),
1153        include_ttl = false,
1154        ttl_column_name = "_ttl".to_string(),
1155        include_row_index = false,
1156        row_index_column_name = "_index".to_string(),
1157        max_rows = None,
1158        sort_by = None,
1159        sort_ascending = true
1160    ))]
1161    #[allow(clippy::too_many_arguments)]
1162    fn new(
1163        url: String,
1164        index: String,
1165        query: String,
1166        schema: Vec<(String, String)>,
1167        batch_size: usize,
1168        projection: Option<Vec<String>>,
1169        include_key: bool,
1170        key_column_name: String,
1171        include_ttl: bool,
1172        ttl_column_name: String,
1173        include_row_index: bool,
1174        row_index_column_name: String,
1175        max_rows: Option<usize>,
1176        sort_by: Option<String>,
1177        sort_ascending: bool,
1178    ) -> PyResult<Self> {
1179        // Parse schema from Python types
1180        let field_types: Vec<(String, RedisType)> = schema
1181            .into_iter()
1182            .map(|(name, type_str)| {
1183                let redis_type = match type_str.to_lowercase().as_str() {
1184                    "utf8" | "str" | "string" => RedisType::Utf8,
1185                    "int64" | "int" | "integer" => RedisType::Int64,
1186                    "float64" | "float" | "double" => RedisType::Float64,
1187                    "bool" | "boolean" => RedisType::Boolean,
1188                    _ => {
1189                        return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1190                            "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
1191                            type_str, name
1192                        )));
1193                    }
1194                };
1195                Ok((name, redis_type))
1196            })
1197            .collect::<PyResult<Vec<_>>>()?;
1198
1199        let hash_schema = HashSchema::new(field_types)
1200            .with_key(include_key)
1201            .with_key_column_name(key_column_name)
1202            .with_ttl(include_ttl)
1203            .with_ttl_column_name(ttl_column_name)
1204            .with_row_index(include_row_index)
1205            .with_row_index_column_name(row_index_column_name);
1206
1207        let mut config = SearchBatchConfig::new(index, query).with_batch_size(batch_size);
1208
1209        if let Some(max) = max_rows {
1210            config = config.with_max_rows(max);
1211        }
1212
1213        if let Some(field) = sort_by {
1214            config = config.with_sort_by(field, sort_ascending);
1215        }
1216
1217        let inner = HashSearchIterator::new(&url, hash_schema, config, projection)
1218            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1219
1220        Ok(Self { inner })
1221    }
1222
1223    /// Get the next batch as Arrow IPC bytes.
1224    ///
1225    /// Returns None when iteration is complete.
1226    /// Returns the RecordBatch serialized as Arrow IPC format.
1227    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1228        let batch = self
1229            .inner
1230            .next_batch()
1231            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1232
1233        match batch {
1234            Some(record_batch) => {
1235                // Serialize to Arrow IPC format
1236                let mut buf = Vec::new();
1237                {
1238                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
1239                        &mut buf,
1240                        record_batch.schema().as_ref(),
1241                    )
1242                    .map_err(|e| {
1243                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1244                            "Failed to create IPC writer: {}",
1245                            e
1246                        ))
1247                    })?;
1248
1249                    writer.write(&record_batch).map_err(|e| {
1250                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1251                            "Failed to write batch: {}",
1252                            e
1253                        ))
1254                    })?;
1255
1256                    writer.finish().map_err(|e| {
1257                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1258                            "Failed to finish IPC: {}",
1259                            e
1260                        ))
1261                    })?;
1262                }
1263
1264                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1265            }
1266            None => Ok(None),
1267        }
1268    }
1269
1270    /// Check if iteration is complete.
1271    fn is_done(&self) -> bool {
1272        self.inner.is_done()
1273    }
1274
1275    /// Get the number of rows yielded so far.
1276    fn rows_yielded(&self) -> usize {
1277        self.inner.rows_yielded()
1278    }
1279
1280    /// Get the total number of matching documents (available after first batch).
1281    fn total_results(&self) -> Option<usize> {
1282        self.inner.total_results()
1283    }
1284}
1285
1286#[cfg(all(feature = "python", feature = "search"))]
1287/// Execute FT.AGGREGATE and return aggregated results as a list of dictionaries.
1288///
1289/// # Arguments
1290/// * `url` - Redis connection URL
1291/// * `index` - RediSearch index name
1292/// * `query` - RediSearch query string (e.g., "@status:active", "*" for all)
1293/// * `group_by` - List of field names to group by
1294/// * `reduce` - List of reduce operations as (function, args, alias) tuples
1295/// * `apply` - Optional list of apply expressions as (expression, alias) tuples
1296/// * `filter` - Optional post-aggregation filter expression
1297/// * `sort_by` - Optional list of sort specifications as (field, ascending) tuples
1298/// * `limit` - Optional maximum number of results
1299/// * `offset` - Offset for pagination (default: 0)
1300/// * `load` - Optional list of fields to load from documents
1301///
1302/// # Returns
1303/// A list of dictionaries, where each dictionary represents an aggregated row.
1304///
1305/// # Example
1306/// ```python
1307/// result = py_aggregate(
1308///     "redis://localhost:6379",
1309///     "users_idx",
1310///     "*",
1311///     group_by=["city"],
1312///     reduce=[("COUNT", [], "user_count"), ("AVG", ["age"], "avg_age")],
1313///     sort_by=[("user_count", False)],
1314///     limit=10,
1315/// )
1316/// for row in result:
1317///     print(f"{row['city']}: {row['user_count']} users, avg age {row['avg_age']}")
1318/// ```
1319#[pyfunction]
1320#[pyo3(signature = (
1321    url,
1322    index,
1323    query,
1324    group_by = vec![],
1325    reduce = vec![],
1326    apply = None,
1327    filter = None,
1328    sort_by = None,
1329    limit = None,
1330    offset = 0,
1331    load = None
1332))]
1333#[allow(clippy::too_many_arguments)]
1334fn py_aggregate(
1335    url: &str,
1336    index: &str,
1337    query: &str,
1338    group_by: Vec<String>,
1339    reduce: Vec<(String, Vec<String>, String)>,
1340    apply: Option<Vec<(String, String)>>,
1341    filter: Option<String>,
1342    sort_by: Option<Vec<(String, bool)>>,
1343    limit: Option<usize>,
1344    offset: usize,
1345    load: Option<Vec<String>>,
1346) -> PyResult<Vec<std::collections::HashMap<String, String>>> {
1347    use crate::connection::RedisConnection;
1348    use crate::search::{AggregateConfig, ApplyExpr, ReduceOp, SortBy, aggregate};
1349
1350    let rt = tokio::runtime::Runtime::new()
1351        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1352
1353    rt.block_on(async {
1354        let connection = RedisConnection::new(url)
1355            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1356
1357        let mut conn = connection
1358            .get_connection_manager()
1359            .await
1360            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1361
1362        // Build reduce operations
1363        let reduce_ops: Vec<ReduceOp> = reduce
1364            .into_iter()
1365            .map(|(func, args, alias)| ReduceOp::new(func, args, alias))
1366            .collect();
1367
1368        // Build apply expressions
1369        let apply_exprs: Vec<ApplyExpr> = apply
1370            .unwrap_or_default()
1371            .into_iter()
1372            .map(|(expr, alias)| ApplyExpr::new(expr, alias))
1373            .collect();
1374
1375        // Build sort specifications
1376        let sort_specs: Vec<SortBy> = sort_by
1377            .unwrap_or_default()
1378            .into_iter()
1379            .map(|(field, ascending)| {
1380                if ascending {
1381                    SortBy::asc(field)
1382                } else {
1383                    SortBy::desc(field)
1384                }
1385            })
1386            .collect();
1387
1388        // Build config
1389        let mut config = AggregateConfig::new(index, query)
1390            .with_group_by(group_by)
1391            .with_reduce(reduce_ops)
1392            .with_apply(apply_exprs)
1393            .with_sort_by(sort_specs)
1394            .with_offset(offset);
1395
1396        if let Some(f) = filter {
1397            config = config.with_filter(f);
1398        }
1399
1400        if let Some(l) = limit {
1401            config = config.with_limit(l);
1402        }
1403
1404        if let Some(fields) = load {
1405            config = config.with_load(fields);
1406        }
1407
1408        // Execute aggregate
1409        let result = aggregate(&mut conn, &config)
1410            .await
1411            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1412
1413        Ok(result.rows)
1414    })
1415}
1416
1417#[cfg(feature = "python")]
1418/// Python wrapper for TimeSeriesBatchIterator.
1419///
1420/// This class is used by the Python IO plugin to iterate over RedisTimeSeries data
1421/// and yield Arrow RecordBatches.
1422#[pyclass]
1423pub struct PyTimeSeriesBatchIterator {
1424    inner: TimeSeriesBatchIterator,
1425}
1426
1427#[cfg(feature = "python")]
1428#[pymethods]
1429impl PyTimeSeriesBatchIterator {
1430    /// Create a new PyTimeSeriesBatchIterator.
1431    ///
1432    /// # Arguments
1433    /// * `url` - Redis connection URL
1434    /// * `pattern` - Key pattern to match
1435    /// * `batch_size` - Keys per batch
1436    /// * `count_hint` - SCAN COUNT hint
1437    /// * `start` - Start timestamp for TS.RANGE (default: "-" for oldest)
1438    /// * `end` - End timestamp for TS.RANGE (default: "+" for newest)
1439    /// * `count_per_series` - Max samples per time series (optional)
1440    /// * `aggregation` - Aggregation type (avg, sum, min, max, etc.)
1441    /// * `bucket_size_ms` - Bucket size in milliseconds for aggregation
1442    /// * `include_key` - Whether to include the Redis key as a column
1443    /// * `key_column_name` - Name of the key column
1444    /// * `include_timestamp` - Whether to include the timestamp as a column
1445    /// * `timestamp_column_name` - Name of the timestamp column
1446    /// * `value_column_name` - Name of the value column
1447    /// * `include_row_index` - Whether to include the row index as a column
1448    /// * `row_index_column_name` - Name of the row index column
1449    /// * `label_columns` - Label names to include as columns
1450    /// * `max_rows` - Optional maximum rows to return
1451    #[new]
1452    #[pyo3(signature = (
1453        url,
1454        pattern,
1455        batch_size = 1000,
1456        count_hint = 100,
1457        start = "-".to_string(),
1458        end = "+".to_string(),
1459        count_per_series = None,
1460        aggregation = None,
1461        bucket_size_ms = None,
1462        include_key = true,
1463        key_column_name = "_key".to_string(),
1464        include_timestamp = true,
1465        timestamp_column_name = "_ts".to_string(),
1466        value_column_name = "value".to_string(),
1467        include_row_index = false,
1468        row_index_column_name = "_index".to_string(),
1469        label_columns = vec![],
1470        max_rows = None
1471    ))]
1472    #[allow(clippy::too_many_arguments)]
1473    fn new(
1474        url: String,
1475        pattern: String,
1476        batch_size: usize,
1477        count_hint: usize,
1478        start: String,
1479        end: String,
1480        count_per_series: Option<usize>,
1481        aggregation: Option<String>,
1482        bucket_size_ms: Option<i64>,
1483        include_key: bool,
1484        key_column_name: String,
1485        include_timestamp: bool,
1486        timestamp_column_name: String,
1487        value_column_name: String,
1488        include_row_index: bool,
1489        row_index_column_name: String,
1490        label_columns: Vec<String>,
1491        max_rows: Option<usize>,
1492    ) -> PyResult<Self> {
1493        let ts_schema = TimeSeriesSchema::new()
1494            .with_key(include_key)
1495            .with_key_column_name(&key_column_name)
1496            .with_timestamp(include_timestamp)
1497            .with_timestamp_column_name(&timestamp_column_name)
1498            .with_value_column_name(&value_column_name)
1499            .with_row_index(include_row_index)
1500            .with_row_index_column_name(&row_index_column_name)
1501            .with_label_columns(label_columns);
1502
1503        let mut config = types::hash::BatchConfig::new(pattern)
1504            .with_batch_size(batch_size)
1505            .with_count_hint(count_hint);
1506
1507        if let Some(max) = max_rows {
1508            config = config.with_max_rows(max);
1509        }
1510
1511        let mut inner = TimeSeriesBatchIterator::new(&url, ts_schema, config)
1512            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1513
1514        inner = inner.with_start(&start).with_end(&end);
1515
1516        if let Some(count) = count_per_series {
1517            inner = inner.with_count_per_series(count);
1518        }
1519
1520        if let (Some(agg), Some(bucket)) = (aggregation, bucket_size_ms) {
1521            inner = inner.with_aggregation(&agg, bucket);
1522        }
1523
1524        Ok(Self { inner })
1525    }
1526
1527    /// Get the next batch as Arrow IPC bytes.
1528    ///
1529    /// Returns None when iteration is complete.
1530    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1531        let batch = self
1532            .inner
1533            .next_batch()
1534            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1535
1536        match batch {
1537            Some(record_batch) => {
1538                let mut buf = Vec::new();
1539                {
1540                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
1541                        &mut buf,
1542                        record_batch.schema().as_ref(),
1543                    )
1544                    .map_err(|e| {
1545                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1546                            "Failed to create IPC writer: {}",
1547                            e
1548                        ))
1549                    })?;
1550
1551                    writer.write(&record_batch).map_err(|e| {
1552                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1553                            "Failed to write batch: {}",
1554                            e
1555                        ))
1556                    })?;
1557
1558                    writer.finish().map_err(|e| {
1559                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1560                            "Failed to finish IPC: {}",
1561                            e
1562                        ))
1563                    })?;
1564                }
1565
1566                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1567            }
1568            None => Ok(None),
1569        }
1570    }
1571
1572    /// Check if iteration is complete.
1573    fn is_done(&self) -> bool {
1574        self.inner.is_done()
1575    }
1576
1577    /// Get the number of rows yielded so far.
1578    fn rows_yielded(&self) -> usize {
1579        self.inner.rows_yielded()
1580    }
1581}
1582
1583#[cfg(feature = "python")]
1584/// Python wrapper for JsonBatchIterator.
1585///
1586/// This class is used by the Python IO plugin to iterate over Redis JSON data
1587/// and yield Arrow RecordBatches.
1588#[pyclass]
1589pub struct PyJsonBatchIterator {
1590    inner: JsonBatchIterator,
1591}
1592
1593#[cfg(feature = "python")]
1594#[pymethods]
1595impl PyJsonBatchIterator {
1596    /// Create a new PyJsonBatchIterator.
1597    ///
1598    /// # Arguments
1599    /// * `url` - Redis connection URL
1600    /// * `pattern` - Key pattern to match
1601    /// * `schema` - List of (field_name, type_name) tuples
1602    /// * `batch_size` - Keys per batch
1603    /// * `count_hint` - SCAN COUNT hint
1604    /// * `projection` - Optional list of columns to fetch
1605    /// * `include_key` - Whether to include the Redis key as a column
1606    /// * `key_column_name` - Name of the key column
1607    /// * `include_ttl` - Whether to include the TTL as a column
1608    /// * `ttl_column_name` - Name of the TTL column
1609    /// * `include_row_index` - Whether to include the row index as a column
1610    /// * `row_index_column_name` - Name of the row index column
1611    /// * `max_rows` - Optional maximum rows to return
1612    /// * `parallel` - Optional number of parallel workers for fetching
1613    #[new]
1614    #[pyo3(signature = (
1615        url,
1616        pattern,
1617        schema,
1618        batch_size = 1000,
1619        count_hint = 100,
1620        projection = None,
1621        include_key = true,
1622        key_column_name = "_key".to_string(),
1623        include_ttl = false,
1624        ttl_column_name = "_ttl".to_string(),
1625        include_row_index = false,
1626        row_index_column_name = "_index".to_string(),
1627        max_rows = None,
1628        parallel = None
1629    ))]
1630    #[allow(clippy::too_many_arguments)]
1631    fn new(
1632        url: String,
1633        pattern: String,
1634        schema: Vec<(String, String)>,
1635        batch_size: usize,
1636        count_hint: usize,
1637        projection: Option<Vec<String>>,
1638        include_key: bool,
1639        key_column_name: String,
1640        include_ttl: bool,
1641        ttl_column_name: String,
1642        include_row_index: bool,
1643        row_index_column_name: String,
1644        max_rows: Option<usize>,
1645        parallel: Option<usize>,
1646    ) -> PyResult<Self> {
1647        // Parse schema from Python type strings to Arrow DataTypes
1648        let field_types: Vec<(String, DataType)> = schema
1649            .into_iter()
1650            .map(|(name, type_str)| {
1651                let dtype = match type_str.to_lowercase().as_str() {
1652                    "utf8" | "str" | "string" => DataType::Utf8,
1653                    "int64" | "int" | "integer" => DataType::Int64,
1654                    "float64" | "float" | "double" => DataType::Float64,
1655                    "bool" | "boolean" => DataType::Boolean,
1656                    _ => {
1657                        return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1658                            "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
1659                            type_str, name
1660                        )));
1661                    }
1662                };
1663                Ok((name, dtype))
1664            })
1665            .collect::<PyResult<Vec<_>>>()?;
1666
1667        let json_schema = JsonSchema::new(field_types)
1668            .with_key(include_key)
1669            .with_key_column_name(key_column_name)
1670            .with_ttl(include_ttl)
1671            .with_ttl_column_name(ttl_column_name)
1672            .with_row_index(include_row_index)
1673            .with_row_index_column_name(row_index_column_name);
1674
1675        let mut config = BatchConfig::new(pattern)
1676            .with_batch_size(batch_size)
1677            .with_count_hint(count_hint);
1678
1679        if let Some(max) = max_rows {
1680            config = config.with_max_rows(max);
1681        }
1682
1683        if let Some(workers) = parallel {
1684            config = config.with_parallel(ParallelStrategy::batches(workers));
1685        }
1686
1687        let inner = JsonBatchIterator::new(&url, json_schema, config, projection)
1688            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1689
1690        Ok(Self { inner })
1691    }
1692
1693    /// Get the next batch as Arrow IPC bytes.
1694    ///
1695    /// Returns None when iteration is complete.
1696    /// Returns the RecordBatch serialized as Arrow IPC format.
1697    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1698        let batch = self
1699            .inner
1700            .next_batch()
1701            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1702
1703        match batch {
1704            Some(record_batch) => {
1705                // Serialize to Arrow IPC format
1706                let mut buf = Vec::new();
1707                {
1708                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
1709                        &mut buf,
1710                        record_batch.schema().as_ref(),
1711                    )
1712                    .map_err(|e| {
1713                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1714                            "Failed to create IPC writer: {}",
1715                            e
1716                        ))
1717                    })?;
1718
1719                    writer.write(&record_batch).map_err(|e| {
1720                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1721                            "Failed to write batch: {}",
1722                            e
1723                        ))
1724                    })?;
1725
1726                    writer.finish().map_err(|e| {
1727                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1728                            "Failed to finish IPC: {}",
1729                            e
1730                        ))
1731                    })?;
1732                }
1733
1734                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1735            }
1736            None => Ok(None),
1737        }
1738    }
1739
1740    /// Check if iteration is complete.
1741    fn is_done(&self) -> bool {
1742        self.inner.is_done()
1743    }
1744
1745    /// Get the number of rows yielded so far.
1746    fn rows_yielded(&self) -> usize {
1747        self.inner.rows_yielded()
1748    }
1749}
1750
1751#[cfg(feature = "python")]
1752/// Python wrapper for StringBatchIterator.
1753///
1754/// This class is used by the Python IO plugin to iterate over Redis string data
1755/// and yield Arrow RecordBatches.
1756#[pyclass]
1757pub struct PyStringBatchIterator {
1758    inner: StringBatchIterator,
1759}
1760
1761#[cfg(feature = "python")]
1762#[pymethods]
1763impl PyStringBatchIterator {
1764    /// Create a new PyStringBatchIterator.
1765    ///
1766    /// # Arguments
1767    /// * `url` - Redis connection URL
1768    /// * `pattern` - Key pattern to match
1769    /// * `value_type` - Type string for value column (utf8, int64, float64, bool, date, datetime)
1770    /// * `batch_size` - Keys per batch
1771    /// * `count_hint` - SCAN COUNT hint
1772    /// * `include_key` - Whether to include the Redis key as a column
1773    /// * `key_column_name` - Name of the key column
1774    /// * `value_column_name` - Name of the value column
1775    /// * `include_ttl` - Whether to include the TTL as a column
1776    /// * `ttl_column_name` - Name of the TTL column
1777    /// * `max_rows` - Optional maximum rows to return
1778    /// * `parallel` - Optional number of parallel workers for fetching
1779    #[new]
1780    #[pyo3(signature = (
1781        url,
1782        pattern,
1783        value_type = "utf8".to_string(),
1784        batch_size = 1000,
1785        count_hint = 100,
1786        include_key = true,
1787        key_column_name = "_key".to_string(),
1788        value_column_name = "value".to_string(),
1789        include_ttl = false,
1790        ttl_column_name = "_ttl".to_string(),
1791        max_rows = None,
1792        parallel = None
1793    ))]
1794    #[allow(clippy::too_many_arguments)]
1795    fn new(
1796        url: String,
1797        pattern: String,
1798        value_type: String,
1799        batch_size: usize,
1800        count_hint: usize,
1801        include_key: bool,
1802        key_column_name: String,
1803        value_column_name: String,
1804        include_ttl: bool,
1805        ttl_column_name: String,
1806        max_rows: Option<usize>,
1807        parallel: Option<usize>,
1808    ) -> PyResult<Self> {
1809        use arrow::datatypes::TimeUnit;
1810
1811        // Parse value type from Python type string
1812        let dtype = match value_type.to_lowercase().as_str() {
1813            "utf8" | "str" | "string" => DataType::Utf8,
1814            "int64" | "int" | "integer" => DataType::Int64,
1815            "float64" | "float" | "double" => DataType::Float64,
1816            "bool" | "boolean" => DataType::Boolean,
1817            "date" => DataType::Date32,
1818            "datetime" => DataType::Timestamp(TimeUnit::Microsecond, None),
1819            _ => {
1820                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1821                    "Unknown value type '{}'. Supported: utf8, int64, float64, bool, date, datetime",
1822                    value_type
1823                )));
1824            }
1825        };
1826
1827        let string_schema = StringSchema::new(dtype)
1828            .with_key(include_key)
1829            .with_key_column_name(key_column_name)
1830            .with_value_column_name(value_column_name)
1831            .with_ttl(include_ttl)
1832            .with_ttl_column_name(ttl_column_name);
1833
1834        let mut config = BatchConfig::new(pattern)
1835            .with_batch_size(batch_size)
1836            .with_count_hint(count_hint);
1837
1838        if let Some(max) = max_rows {
1839            config = config.with_max_rows(max);
1840        }
1841
1842        if let Some(workers) = parallel {
1843            config = config.with_parallel(ParallelStrategy::batches(workers));
1844        }
1845
1846        let inner = StringBatchIterator::new(&url, string_schema, config)
1847            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1848
1849        Ok(Self { inner })
1850    }
1851
1852    /// Get the next batch as Arrow IPC bytes.
1853    ///
1854    /// Returns None when iteration is complete.
1855    /// Returns the RecordBatch serialized as Arrow IPC format.
1856    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1857        let batch = self
1858            .inner
1859            .next_batch()
1860            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1861
1862        match batch {
1863            Some(record_batch) => {
1864                // Serialize to Arrow IPC format
1865                let mut buf = Vec::new();
1866                {
1867                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
1868                        &mut buf,
1869                        record_batch.schema().as_ref(),
1870                    )
1871                    .map_err(|e| {
1872                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1873                            "Failed to create IPC writer: {}",
1874                            e
1875                        ))
1876                    })?;
1877
1878                    writer.write(&record_batch).map_err(|e| {
1879                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1880                            "Failed to write batch: {}",
1881                            e
1882                        ))
1883                    })?;
1884
1885                    writer.finish().map_err(|e| {
1886                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1887                            "Failed to finish IPC: {}",
1888                            e
1889                        ))
1890                    })?;
1891                }
1892
1893                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1894            }
1895            None => Ok(None),
1896        }
1897    }
1898
1899    /// Check if iteration is complete.
1900    fn is_done(&self) -> bool {
1901        self.inner.is_done()
1902    }
1903
1904    /// Get the number of rows yielded so far.
1905    fn rows_yielded(&self) -> usize {
1906        self.inner.rows_yielded()
1907    }
1908}
1909
1910#[cfg(feature = "python")]
1911/// Scan Redis keys matching a pattern (for testing connectivity).
1912#[pyfunction]
1913#[pyo3(signature = (connection_url, pattern, count = 10))]
1914fn scan_keys(connection_url: &str, pattern: &str, count: usize) -> PyResult<Vec<String>> {
1915    let rt = tokio::runtime::Runtime::new()
1916        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1917
1918    rt.block_on(async {
1919        let client = redis::Client::open(connection_url)
1920            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1921
1922        let mut conn = client
1923            .get_multiplexed_async_connection()
1924            .await
1925            .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1926
1927        let mut keys: Vec<String> = Vec::new();
1928        let mut cursor = 0u64;
1929
1930        loop {
1931            let (new_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
1932                .arg(cursor)
1933                .arg("MATCH")
1934                .arg(pattern)
1935                .arg("COUNT")
1936                .arg(count)
1937                .query_async(&mut conn)
1938                .await
1939                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1940
1941            keys.extend(batch);
1942            cursor = new_cursor;
1943
1944            if cursor == 0 || keys.len() >= count {
1945                break;
1946            }
1947        }
1948
1949        keys.truncate(count);
1950        Ok(keys)
1951    })
1952}
1953
1954#[cfg(feature = "python")]
1955/// Infer schema from Redis hashes by sampling keys.
1956///
1957/// # Arguments
1958/// * `url` - Redis connection URL
1959/// * `pattern` - Key pattern to match
1960/// * `sample_size` - Maximum number of keys to sample (default: 100)
1961/// * `type_inference` - Whether to infer types (default: true)
1962///
1963/// # Returns
1964/// A tuple of (fields, sample_count) where fields is a list of (name, type) tuples.
1965#[pyfunction]
1966#[pyo3(signature = (url, pattern, sample_size = 100, type_inference = true))]
1967fn py_infer_hash_schema(
1968    url: &str,
1969    pattern: &str,
1970    sample_size: usize,
1971    type_inference: bool,
1972) -> PyResult<(Vec<(String, String)>, usize)> {
1973    let schema = infer_hash_schema(url, pattern, sample_size, type_inference)
1974        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1975
1976    Ok((schema.to_type_strings(), schema.sample_count))
1977}
1978
1979#[cfg(feature = "python")]
1980/// Infer schema from RedisJSON documents by sampling keys.
1981///
1982/// # Arguments
1983/// * `url` - Redis connection URL
1984/// * `pattern` - Key pattern to match
1985/// * `sample_size` - Maximum number of keys to sample (default: 100)
1986///
1987/// # Returns
1988/// A tuple of (fields, sample_count) where fields is a list of (name, type) tuples.
1989#[pyfunction]
1990#[pyo3(signature = (url, pattern, sample_size = 100))]
1991fn py_infer_json_schema(
1992    url: &str,
1993    pattern: &str,
1994    sample_size: usize,
1995) -> PyResult<(Vec<(String, String)>, usize)> {
1996    let schema = infer_json_schema(url, pattern, sample_size)
1997        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1998
1999    Ok((schema.to_type_strings(), schema.sample_count))
2000}
2001
2002#[cfg(feature = "python")]
2003/// Infer schema from Redis hashes with optional schema overwrite.
2004///
2005/// This function infers a schema by sampling Redis hashes, then applies
2006/// user-specified type overrides. This is useful when you want to infer
2007/// most fields but override specific ones.
2008///
2009/// # Arguments
2010/// * `url` - Redis connection URL
2011/// * `pattern` - Key pattern to match
2012/// * `schema_overwrite` - Optional list of (field_name, type_string) tuples to override
2013/// * `sample_size` - Maximum number of keys to sample (default: 100)
2014/// * `type_inference` - Whether to infer types (default: true)
2015///
2016/// # Returns
2017/// A tuple of (fields, sample_count) where fields is a list of (name, type) tuples.
2018///
2019/// # Example
2020/// ```python
2021/// # Infer schema but force 'age' to be int64 and 'created_at' to be datetime
2022/// schema, count = py_infer_hash_schema_with_overwrite(
2023///     "redis://localhost",
2024///     "user:*",
2025///     [("age", "int64"), ("created_at", "datetime")],
2026/// )
2027/// ```
2028#[pyfunction]
2029#[pyo3(signature = (url, pattern, schema_overwrite = None, sample_size = 100, type_inference = true))]
2030fn py_infer_hash_schema_with_overwrite(
2031    url: &str,
2032    pattern: &str,
2033    schema_overwrite: Option<Vec<(String, String)>>,
2034    sample_size: usize,
2035    type_inference: bool,
2036) -> PyResult<(Vec<(String, String)>, usize)> {
2037    use crate::schema::RedisType;
2038
2039    let schema = infer_hash_schema(url, pattern, sample_size, type_inference)
2040        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2041
2042    // Apply overwrite if provided
2043    let final_schema = if let Some(overwrite) = schema_overwrite {
2044        let overwrite_typed: Vec<(String, RedisType)> = overwrite
2045            .into_iter()
2046            .map(|(name, type_str)| {
2047                let redis_type = match type_str.as_str() {
2048                    "utf8" | "string" => RedisType::Utf8,
2049                    "int64" | "integer" => RedisType::Int64,
2050                    "float64" | "float" => RedisType::Float64,
2051                    "bool" | "boolean" => RedisType::Boolean,
2052                    "date" => RedisType::Date,
2053                    "datetime" => RedisType::Datetime,
2054                    _ => RedisType::Utf8,
2055                };
2056                (name, redis_type)
2057            })
2058            .collect();
2059        schema.with_overwrite(&overwrite_typed)
2060    } else {
2061        schema
2062    };
2063
2064    Ok((final_schema.to_type_strings(), final_schema.sample_count))
2065}
2066
2067#[cfg(feature = "python")]
2068/// Infer schema from RedisJSON documents with optional schema overwrite.
2069///
2070/// # Arguments
2071/// * `url` - Redis connection URL
2072/// * `pattern` - Key pattern to match
2073/// * `schema_overwrite` - Optional list of (field_name, type_string) tuples to override
2074/// * `sample_size` - Maximum number of keys to sample (default: 100)
2075///
2076/// # Returns
2077/// A tuple of (fields, sample_count) where fields is a list of (name, type) tuples.
2078#[pyfunction]
2079#[pyo3(signature = (url, pattern, schema_overwrite = None, sample_size = 100))]
2080fn py_infer_json_schema_with_overwrite(
2081    url: &str,
2082    pattern: &str,
2083    schema_overwrite: Option<Vec<(String, String)>>,
2084    sample_size: usize,
2085) -> PyResult<(Vec<(String, String)>, usize)> {
2086    use crate::schema::RedisType;
2087
2088    let schema = infer_json_schema(url, pattern, sample_size)
2089        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2090
2091    // Apply overwrite if provided
2092    let final_schema = if let Some(overwrite) = schema_overwrite {
2093        let overwrite_typed: Vec<(String, RedisType)> = overwrite
2094            .into_iter()
2095            .map(|(name, type_str)| {
2096                let redis_type = match type_str.as_str() {
2097                    "utf8" | "string" => RedisType::Utf8,
2098                    "int64" | "integer" => RedisType::Int64,
2099                    "float64" | "float" => RedisType::Float64,
2100                    "bool" | "boolean" => RedisType::Boolean,
2101                    "date" => RedisType::Date,
2102                    "datetime" => RedisType::Datetime,
2103                    _ => RedisType::Utf8,
2104                };
2105                (name, redis_type)
2106            })
2107            .collect();
2108        schema.with_overwrite(&overwrite_typed)
2109    } else {
2110        schema
2111    };
2112
2113    Ok((final_schema.to_type_strings(), final_schema.sample_count))
2114}
2115
2116#[cfg(feature = "python")]
2117/// Infer schema from Redis hashes with detailed confidence information.
2118///
2119/// This function returns confidence scores for each field, indicating how
2120/// reliably the type was inferred. Use this when you need to validate
2121/// schema quality before processing large datasets.
2122///
2123/// # Arguments
2124/// * `url` - Redis connection URL
2125/// * `pattern` - Key pattern to match
2126/// * `sample_size` - Maximum number of keys to sample (default: 100)
2127///
2128/// # Returns
2129/// A dict with:
2130/// - `fields`: List of (name, type) tuples
2131/// - `sample_count`: Number of keys sampled
2132/// - `field_info`: Dict mapping field names to confidence info dicts
2133/// - `average_confidence`: Overall average confidence score
2134/// - `all_confident`: Whether all fields have confidence >= 0.9
2135///
2136/// Each field_info dict contains:
2137/// - `type`: Inferred type name
2138/// - `confidence`: Score from 0.0 to 1.0
2139/// - `samples`: Total samples for this field
2140/// - `valid`: Number of samples matching the inferred type
2141/// - `nulls`: Number of null/missing values
2142/// - `null_ratio`: Percentage of null values
2143/// - `type_candidates`: Dict of type names to match counts
2144#[pyfunction]
2145#[pyo3(signature = (url, pattern, sample_size = 100))]
2146fn py_infer_hash_schema_with_confidence(
2147    py: Python<'_>,
2148    url: &str,
2149    pattern: &str,
2150    sample_size: usize,
2151) -> PyResult<HashMap<String, Py<PyAny>>> {
2152    let schema = infer_hash_schema_with_confidence(url, pattern, sample_size)
2153        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2154
2155    let mut result: HashMap<String, Py<PyAny>> = HashMap::new();
2156
2157    // Convert fields to Python list of tuples
2158    let fields: Vec<(String, String)> = schema
2159        .fields
2160        .iter()
2161        .map(|(name, dtype)| {
2162            let type_str = match dtype {
2163                crate::schema::RedisType::Utf8 => "utf8",
2164                crate::schema::RedisType::Int64 => "int64",
2165                crate::schema::RedisType::Float64 => "float64",
2166                crate::schema::RedisType::Boolean => "bool",
2167                crate::schema::RedisType::Date => "date",
2168                crate::schema::RedisType::Datetime => "datetime",
2169            };
2170            (name.clone(), type_str.to_string())
2171        })
2172        .collect();
2173
2174    result.insert(
2175        "fields".to_string(),
2176        fields.into_pyobject(py)?.into_any().unbind(),
2177    );
2178    result.insert(
2179        "sample_count".to_string(),
2180        schema.sample_count.into_pyobject(py)?.into_any().unbind(),
2181    );
2182
2183    // Convert field_info to Python dict
2184    let mut field_info_py: HashMap<String, HashMap<String, Py<PyAny>>> = HashMap::new();
2185    for (name, info) in &schema.field_info {
2186        let mut info_dict: HashMap<String, Py<PyAny>> = HashMap::new();
2187        let type_str = match info.inferred_type {
2188            crate::schema::RedisType::Utf8 => "utf8",
2189            crate::schema::RedisType::Int64 => "int64",
2190            crate::schema::RedisType::Float64 => "float64",
2191            crate::schema::RedisType::Boolean => "bool",
2192            crate::schema::RedisType::Date => "date",
2193            crate::schema::RedisType::Datetime => "datetime",
2194        };
2195        info_dict.insert(
2196            "type".to_string(),
2197            type_str.into_pyobject(py)?.into_any().unbind(),
2198        );
2199        info_dict.insert(
2200            "confidence".to_string(),
2201            info.confidence.into_pyobject(py)?.into_any().unbind(),
2202        );
2203        info_dict.insert(
2204            "samples".to_string(),
2205            info.samples.into_pyobject(py)?.into_any().unbind(),
2206        );
2207        info_dict.insert(
2208            "valid".to_string(),
2209            info.valid.into_pyobject(py)?.into_any().unbind(),
2210        );
2211        info_dict.insert(
2212            "nulls".to_string(),
2213            info.nulls.into_pyobject(py)?.into_any().unbind(),
2214        );
2215        info_dict.insert(
2216            "null_ratio".to_string(),
2217            info.null_ratio().into_pyobject(py)?.into_any().unbind(),
2218        );
2219        info_dict.insert(
2220            "type_candidates".to_string(),
2221            info.type_candidates
2222                .clone()
2223                .into_pyobject(py)?
2224                .into_any()
2225                .unbind(),
2226        );
2227
2228        field_info_py.insert(name.clone(), info_dict);
2229    }
2230    result.insert(
2231        "field_info".to_string(),
2232        field_info_py.into_pyobject(py)?.into_any().unbind(),
2233    );
2234
2235    // Add convenience fields
2236    result.insert(
2237        "average_confidence".to_string(),
2238        schema
2239            .average_confidence()
2240            .into_pyobject(py)?
2241            .into_any()
2242            .unbind(),
2243    );
2244    // Boolean needs special handling due to PyO3's Borrowed type
2245    let all_confident_bool = pyo3::types::PyBool::new(py, schema.all_confident(0.9));
2246    result.insert(
2247        "all_confident".to_string(),
2248        all_confident_bool.to_owned().into_any().unbind(),
2249    );
2250
2251    Ok(result)
2252}
2253
2254#[cfg(feature = "python")]
2255/// Write hashes to Redis.
2256///
2257/// # Arguments
2258/// * `url` - Redis connection URL
2259/// * `keys` - List of Redis keys to write to
2260/// * `fields` - List of field names
2261/// * `values` - 2D list of values (rows x columns), same order as fields
2262/// * `ttl` - Optional TTL in seconds for each key
2263/// * `if_exists` - How to handle existing keys: "fail", "replace", or "append"
2264///
2265/// # Returns
2266/// A tuple of (keys_written, keys_failed, keys_skipped).
2267#[pyfunction]
2268#[pyo3(signature = (url, keys, fields, values, ttl = None, if_exists = "replace".to_string()))]
2269fn py_write_hashes(
2270    url: &str,
2271    keys: Vec<String>,
2272    fields: Vec<String>,
2273    values: Vec<Vec<Option<String>>>,
2274    ttl: Option<i64>,
2275    if_exists: String,
2276) -> PyResult<(usize, usize, usize)> {
2277    let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2278        PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2279    })?;
2280
2281    let result = write_hashes(url, keys, fields, values, ttl, mode)
2282        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2283
2284    Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2285}
2286
2287#[cfg(feature = "python")]
2288/// Write hashes to Redis with detailed per-key error reporting.
2289///
2290/// This is similar to `py_write_hashes` but returns detailed information about
2291/// which specific keys succeeded or failed, enabling retry logic and better
2292/// error handling in production workflows.
2293///
2294/// # Arguments
2295/// * `url` - Redis connection URL
2296/// * `keys` - List of Redis keys to write to
2297/// * `fields` - List of field names
2298/// * `values` - 2D list of values (rows x columns), same order as fields
2299/// * `ttl` - Optional TTL in seconds for each key
2300/// * `if_exists` - How to handle existing keys: "fail", "replace", or "append"
2301///
2302/// # Returns
2303/// A dict with keys:
2304/// - `keys_written`: Number of keys successfully written
2305/// - `keys_failed`: Number of keys that failed
2306/// - `keys_skipped`: Number of keys skipped (when mode is "fail" and key exists)
2307/// - `succeeded_keys`: List of keys that were successfully written
2308/// - `failed_keys`: List of keys that failed to write
2309/// - `errors`: Dict mapping failed keys to their error messages
2310#[pyfunction]
2311#[pyo3(signature = (url, keys, fields, values, ttl = None, if_exists = "replace".to_string()))]
2312fn py_write_hashes_detailed(
2313    url: &str,
2314    keys: Vec<String>,
2315    fields: Vec<String>,
2316    values: Vec<Vec<Option<String>>>,
2317    ttl: Option<i64>,
2318    if_exists: String,
2319) -> PyResult<std::collections::HashMap<String, Py<PyAny>>> {
2320    use pyo3::IntoPyObjectExt;
2321
2322    let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2323        PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2324    })?;
2325
2326    let result = write_hashes_detailed(url, keys, fields, values, ttl, mode)
2327        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2328
2329    // Build the response dictionary
2330    Python::attach(|py| {
2331        let mut dict = std::collections::HashMap::new();
2332
2333        dict.insert(
2334            "keys_written".to_string(),
2335            result.keys_written.into_py_any(py)?,
2336        );
2337        dict.insert(
2338            "keys_failed".to_string(),
2339            result.keys_failed.into_py_any(py)?,
2340        );
2341        dict.insert(
2342            "keys_skipped".to_string(),
2343            result.keys_skipped.into_py_any(py)?,
2344        );
2345        dict.insert(
2346            "succeeded_keys".to_string(),
2347            result.succeeded_keys.into_py_any(py)?,
2348        );
2349
2350        // Build failed_keys list and errors dict
2351        let failed_keys: Vec<String> = result.errors.iter().map(|e| e.key.clone()).collect();
2352        dict.insert("failed_keys".to_string(), failed_keys.into_py_any(py)?);
2353
2354        let errors: std::collections::HashMap<String, String> = result
2355            .errors
2356            .into_iter()
2357            .map(|e| (e.key, e.error))
2358            .collect();
2359        dict.insert("errors".to_string(), errors.into_py_any(py)?);
2360
2361        Ok(dict)
2362    })
2363}
2364
2365#[cfg(feature = "python")]
2366/// Python wrapper for ListBatchIterator.
2367///
2368/// This class is used by the Python IO plugin to iterate over Redis list data
2369/// and yield Arrow RecordBatches.
2370#[pyclass]
2371pub struct PyListBatchIterator {
2372    inner: ListBatchIterator,
2373}
2374
2375#[cfg(feature = "python")]
2376#[pymethods]
2377impl PyListBatchIterator {
2378    /// Create a new PyListBatchIterator.
2379    ///
2380    /// # Arguments
2381    /// * `url` - Redis connection URL
2382    /// * `pattern` - Key pattern to match
2383    /// * `batch_size` - Keys per batch
2384    /// * `count_hint` - SCAN COUNT hint
2385    /// * `include_key` - Whether to include the Redis key as a column
2386    /// * `key_column_name` - Name of the key column
2387    /// * `element_column_name` - Name of the element column
2388    /// * `include_position` - Whether to include position index
2389    /// * `position_column_name` - Name of the position column
2390    /// * `include_row_index` - Whether to include the row index as a column
2391    /// * `row_index_column_name` - Name of the row index column
2392    /// * `max_rows` - Optional maximum rows to return
2393    #[new]
2394    #[pyo3(signature = (
2395        url,
2396        pattern,
2397        batch_size = 1000,
2398        count_hint = 100,
2399        include_key = true,
2400        key_column_name = "_key".to_string(),
2401        element_column_name = "element".to_string(),
2402        include_position = false,
2403        position_column_name = "position".to_string(),
2404        include_row_index = false,
2405        row_index_column_name = "_index".to_string(),
2406        max_rows = None
2407    ))]
2408    #[allow(clippy::too_many_arguments)]
2409    fn new(
2410        url: String,
2411        pattern: String,
2412        batch_size: usize,
2413        count_hint: usize,
2414        include_key: bool,
2415        key_column_name: String,
2416        element_column_name: String,
2417        include_position: bool,
2418        position_column_name: String,
2419        include_row_index: bool,
2420        row_index_column_name: String,
2421        max_rows: Option<usize>,
2422    ) -> PyResult<Self> {
2423        let list_schema = ListSchema::new()
2424            .with_key(include_key)
2425            .with_key_column_name(&key_column_name)
2426            .with_element_column_name(&element_column_name)
2427            .with_position(include_position)
2428            .with_position_column_name(&position_column_name)
2429            .with_row_index(include_row_index)
2430            .with_row_index_column_name(&row_index_column_name);
2431
2432        let mut config = types::hash::BatchConfig::new(pattern)
2433            .with_batch_size(batch_size)
2434            .with_count_hint(count_hint);
2435
2436        if let Some(max) = max_rows {
2437            config = config.with_max_rows(max);
2438        }
2439
2440        let inner = ListBatchIterator::new(&url, list_schema, config)
2441            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2442
2443        Ok(Self { inner })
2444    }
2445
2446    /// Get the next batch as Arrow IPC bytes.
2447    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
2448        let batch = self
2449            .inner
2450            .next_batch()
2451            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2452
2453        match batch {
2454            Some(record_batch) => {
2455                let mut buf = Vec::new();
2456                {
2457                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
2458                        &mut buf,
2459                        record_batch.schema().as_ref(),
2460                    )
2461                    .map_err(|e| {
2462                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2463                            "Failed to create IPC writer: {}",
2464                            e
2465                        ))
2466                    })?;
2467
2468                    writer.write(&record_batch).map_err(|e| {
2469                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2470                            "Failed to write batch: {}",
2471                            e
2472                        ))
2473                    })?;
2474
2475                    writer.finish().map_err(|e| {
2476                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2477                            "Failed to finish IPC: {}",
2478                            e
2479                        ))
2480                    })?;
2481                }
2482
2483                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
2484            }
2485            None => Ok(None),
2486        }
2487    }
2488
2489    /// Check if iteration is complete.
2490    fn is_done(&self) -> bool {
2491        self.inner.is_done()
2492    }
2493
2494    /// Get the number of rows yielded so far.
2495    fn rows_yielded(&self) -> usize {
2496        self.inner.rows_yielded()
2497    }
2498}
2499
2500#[cfg(feature = "python")]
2501/// Write list elements to Redis.
2502///
2503/// # Arguments
2504/// * `url` - Redis connection URL
2505/// * `keys` - List of Redis keys to write to
2506/// * `elements` - 2D list of elements for each list
2507/// * `ttl` - Optional TTL in seconds for each key
2508/// * `if_exists` - How to handle existing keys: "fail", "replace", or "append"
2509///
2510/// # Returns
2511/// A tuple of (keys_written, keys_failed, keys_skipped).
2512#[pyfunction]
2513#[pyo3(signature = (url, keys, elements, ttl = None, if_exists = "replace".to_string()))]
2514fn py_write_lists(
2515    url: &str,
2516    keys: Vec<String>,
2517    elements: Vec<Vec<String>>,
2518    ttl: Option<i64>,
2519    if_exists: String,
2520) -> PyResult<(usize, usize, usize)> {
2521    let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2522        PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2523    })?;
2524
2525    let result = write_lists(url, keys, elements, ttl, mode)
2526        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2527
2528    Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2529}
2530
2531#[cfg(feature = "python")]
2532/// Write JSON documents to Redis.
2533///
2534/// # Arguments
2535/// * `url` - Redis connection URL
2536/// * `keys` - List of Redis keys to write to
2537/// * `json_strings` - List of JSON strings to write
2538/// * `ttl` - Optional TTL in seconds for each key
2539/// * `if_exists` - How to handle existing keys: "fail", "replace", or "append"
2540///
2541/// # Returns
2542/// A tuple of (keys_written, keys_failed, keys_skipped).
2543#[pyfunction]
2544#[pyo3(signature = (url, keys, json_strings, ttl = None, if_exists = "replace".to_string()))]
2545fn py_write_json(
2546    url: &str,
2547    keys: Vec<String>,
2548    json_strings: Vec<String>,
2549    ttl: Option<i64>,
2550    if_exists: String,
2551) -> PyResult<(usize, usize, usize)> {
2552    let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2553        PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2554    })?;
2555
2556    let result = write_json(url, keys, json_strings, ttl, mode)
2557        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2558
2559    Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2560}
2561
2562#[cfg(feature = "python")]
2563/// Write string values to Redis.
2564///
2565/// # Arguments
2566/// * `url` - Redis connection URL
2567/// * `keys` - List of Redis keys to write to
2568/// * `values` - List of string values to write (None for null)
2569/// * `ttl` - Optional TTL in seconds for each key
2570/// * `if_exists` - How to handle existing keys: "fail", "replace", or "append"
2571///
2572/// # Returns
2573/// A tuple of (keys_written, keys_failed, keys_skipped).
2574#[pyfunction]
2575#[pyo3(signature = (url, keys, values, ttl = None, if_exists = "replace".to_string()))]
2576fn py_write_strings(
2577    url: &str,
2578    keys: Vec<String>,
2579    values: Vec<Option<String>>,
2580    ttl: Option<i64>,
2581    if_exists: String,
2582) -> PyResult<(usize, usize, usize)> {
2583    let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2584        PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2585    })?;
2586
2587    let result = write_strings(url, keys, values, ttl, mode)
2588        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2589
2590    Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2591}
2592
2593#[cfg(feature = "python")]
2594/// Python wrapper for SetBatchIterator.
2595///
2596/// This class is used by the Python IO plugin to iterate over Redis set data
2597/// and yield Arrow RecordBatches.
2598#[pyclass]
2599pub struct PySetBatchIterator {
2600    inner: SetBatchIterator,
2601}
2602
2603#[cfg(feature = "python")]
2604#[pymethods]
2605impl PySetBatchIterator {
2606    /// Create a new PySetBatchIterator.
2607    ///
2608    /// # Arguments
2609    /// * `url` - Redis connection URL
2610    /// * `pattern` - Key pattern to match
2611    /// * `batch_size` - Keys per batch
2612    /// * `count_hint` - SCAN COUNT hint
2613    /// * `include_key` - Whether to include the Redis key as a column
2614    /// * `key_column_name` - Name of the key column
2615    /// * `member_column_name` - Name of the member column
2616    /// * `include_row_index` - Whether to include the row index as a column
2617    /// * `row_index_column_name` - Name of the row index column
2618    /// * `max_rows` - Optional maximum rows to return
2619    #[new]
2620    #[pyo3(signature = (
2621        url,
2622        pattern,
2623        batch_size = 1000,
2624        count_hint = 100,
2625        include_key = true,
2626        key_column_name = "_key".to_string(),
2627        member_column_name = "member".to_string(),
2628        include_row_index = false,
2629        row_index_column_name = "_index".to_string(),
2630        max_rows = None
2631    ))]
2632    #[allow(clippy::too_many_arguments)]
2633    fn new(
2634        url: String,
2635        pattern: String,
2636        batch_size: usize,
2637        count_hint: usize,
2638        include_key: bool,
2639        key_column_name: String,
2640        member_column_name: String,
2641        include_row_index: bool,
2642        row_index_column_name: String,
2643        max_rows: Option<usize>,
2644    ) -> PyResult<Self> {
2645        let set_schema = SetSchema::new()
2646            .with_key(include_key)
2647            .with_key_column_name(&key_column_name)
2648            .with_member_column_name(&member_column_name)
2649            .with_row_index(include_row_index)
2650            .with_row_index_column_name(&row_index_column_name);
2651
2652        let mut config = types::hash::BatchConfig::new(pattern)
2653            .with_batch_size(batch_size)
2654            .with_count_hint(count_hint);
2655
2656        if let Some(max) = max_rows {
2657            config = config.with_max_rows(max);
2658        }
2659
2660        let inner = SetBatchIterator::new(&url, set_schema, config)
2661            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2662
2663        Ok(Self { inner })
2664    }
2665
2666    /// Get the next batch as Arrow IPC bytes.
2667    ///
2668    /// Returns None when iteration is complete.
2669    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
2670        let batch = self
2671            .inner
2672            .next_batch()
2673            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2674
2675        match batch {
2676            Some(record_batch) => {
2677                let mut buf = Vec::new();
2678                {
2679                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
2680                        &mut buf,
2681                        record_batch.schema().as_ref(),
2682                    )
2683                    .map_err(|e| {
2684                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2685                            "Failed to create IPC writer: {}",
2686                            e
2687                        ))
2688                    })?;
2689
2690                    writer.write(&record_batch).map_err(|e| {
2691                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2692                            "Failed to write batch: {}",
2693                            e
2694                        ))
2695                    })?;
2696
2697                    writer.finish().map_err(|e| {
2698                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2699                            "Failed to finish IPC: {}",
2700                            e
2701                        ))
2702                    })?;
2703                }
2704
2705                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
2706            }
2707            None => Ok(None),
2708        }
2709    }
2710
2711    /// Check if iteration is complete.
2712    fn is_done(&self) -> bool {
2713        self.inner.is_done()
2714    }
2715
2716    /// Get the number of rows yielded so far.
2717    fn rows_yielded(&self) -> usize {
2718        self.inner.rows_yielded()
2719    }
2720}
2721
2722#[cfg(feature = "python")]
2723/// Write set members to Redis.
2724///
2725/// # Arguments
2726/// * `url` - Redis connection URL
2727/// * `keys` - List of Redis keys to write to
2728/// * `members` - 2D list of members for each set
2729/// * `ttl` - Optional TTL in seconds for each key
2730/// * `if_exists` - How to handle existing keys: "fail", "replace", or "append"
2731///
2732/// # Returns
2733/// A tuple of (keys_written, keys_failed, keys_skipped).
2734#[pyfunction]
2735#[pyo3(signature = (url, keys, members, ttl = None, if_exists = "replace".to_string()))]
2736fn py_write_sets(
2737    url: &str,
2738    keys: Vec<String>,
2739    members: Vec<Vec<String>>,
2740    ttl: Option<i64>,
2741    if_exists: String,
2742) -> PyResult<(usize, usize, usize)> {
2743    let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2744        PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2745    })?;
2746
2747    let result = write_sets(url, keys, members, ttl, mode)
2748        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2749
2750    Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2751}
2752
2753#[cfg(feature = "python")]
2754/// Python wrapper for ZSetBatchIterator.
2755///
2756/// This class is used by the Python IO plugin to iterate over Redis sorted set data
2757/// and yield Arrow RecordBatches.
2758#[pyclass]
2759pub struct PyZSetBatchIterator {
2760    inner: ZSetBatchIterator,
2761}
2762
2763#[cfg(feature = "python")]
2764#[pymethods]
2765impl PyZSetBatchIterator {
2766    /// Create a new PyZSetBatchIterator.
2767    ///
2768    /// # Arguments
2769    /// * `url` - Redis connection URL
2770    /// * `pattern` - Key pattern to match
2771    /// * `batch_size` - Keys per batch
2772    /// * `count_hint` - SCAN COUNT hint
2773    /// * `include_key` - Whether to include the Redis key as a column
2774    /// * `key_column_name` - Name of the key column
2775    /// * `member_column_name` - Name of the member column
2776    /// * `score_column_name` - Name of the score column
2777    /// * `include_rank` - Whether to include rank index
2778    /// * `rank_column_name` - Name of the rank column
2779    /// * `include_row_index` - Whether to include the row index as a column
2780    /// * `row_index_column_name` - Name of the row index column
2781    /// * `max_rows` - Optional maximum rows to return
2782    #[new]
2783    #[pyo3(signature = (
2784        url,
2785        pattern,
2786        batch_size = 1000,
2787        count_hint = 100,
2788        include_key = true,
2789        key_column_name = "_key".to_string(),
2790        member_column_name = "member".to_string(),
2791        score_column_name = "score".to_string(),
2792        include_rank = false,
2793        rank_column_name = "rank".to_string(),
2794        include_row_index = false,
2795        row_index_column_name = "_index".to_string(),
2796        max_rows = None
2797    ))]
2798    #[allow(clippy::too_many_arguments)]
2799    fn new(
2800        url: String,
2801        pattern: String,
2802        batch_size: usize,
2803        count_hint: usize,
2804        include_key: bool,
2805        key_column_name: String,
2806        member_column_name: String,
2807        score_column_name: String,
2808        include_rank: bool,
2809        rank_column_name: String,
2810        include_row_index: bool,
2811        row_index_column_name: String,
2812        max_rows: Option<usize>,
2813    ) -> PyResult<Self> {
2814        let zset_schema = ZSetSchema::new()
2815            .with_key(include_key)
2816            .with_key_column_name(&key_column_name)
2817            .with_member_column_name(&member_column_name)
2818            .with_score_column_name(&score_column_name)
2819            .with_rank(include_rank)
2820            .with_rank_column_name(&rank_column_name)
2821            .with_row_index(include_row_index)
2822            .with_row_index_column_name(&row_index_column_name);
2823
2824        let mut config = types::hash::BatchConfig::new(pattern)
2825            .with_batch_size(batch_size)
2826            .with_count_hint(count_hint);
2827
2828        if let Some(max) = max_rows {
2829            config = config.with_max_rows(max);
2830        }
2831
2832        let inner = ZSetBatchIterator::new(&url, zset_schema, config)
2833            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2834
2835        Ok(Self { inner })
2836    }
2837
2838    /// Get the next batch as Arrow IPC bytes.
2839    ///
2840    /// Returns None when iteration is complete.
2841    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
2842        let batch = self
2843            .inner
2844            .next_batch()
2845            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2846
2847        match batch {
2848            Some(record_batch) => {
2849                let mut buf = Vec::new();
2850                {
2851                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
2852                        &mut buf,
2853                        record_batch.schema().as_ref(),
2854                    )
2855                    .map_err(|e| {
2856                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2857                            "Failed to create IPC writer: {}",
2858                            e
2859                        ))
2860                    })?;
2861
2862                    writer.write(&record_batch).map_err(|e| {
2863                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2864                            "Failed to write batch: {}",
2865                            e
2866                        ))
2867                    })?;
2868
2869                    writer.finish().map_err(|e| {
2870                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2871                            "Failed to finish IPC: {}",
2872                            e
2873                        ))
2874                    })?;
2875                }
2876
2877                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
2878            }
2879            None => Ok(None),
2880        }
2881    }
2882
2883    /// Check if iteration is complete.
2884    fn is_done(&self) -> bool {
2885        self.inner.is_done()
2886    }
2887
2888    /// Get the number of rows yielded so far.
2889    fn rows_yielded(&self) -> usize {
2890        self.inner.rows_yielded()
2891    }
2892}
2893
2894#[cfg(feature = "python")]
2895/// Write sorted set members to Redis.
2896///
2897/// # Arguments
2898/// * `url` - Redis connection URL
2899/// * `keys` - List of Redis keys to write to
2900/// * `members_scores` - 2D list of (member, score) tuples for each sorted set
2901/// * `ttl` - Optional TTL in seconds for each key
2902/// * `if_exists` - How to handle existing keys: "fail", "replace", or "append"
2903///
2904/// # Returns
2905/// A tuple of (keys_written, keys_failed, keys_skipped).
2906#[pyfunction]
2907#[pyo3(signature = (url, keys, members_scores, ttl = None, if_exists = "replace".to_string()))]
2908fn py_write_zsets(
2909    url: &str,
2910    keys: Vec<String>,
2911    members_scores: Vec<Vec<(String, f64)>>,
2912    ttl: Option<i64>,
2913    if_exists: String,
2914) -> PyResult<(usize, usize, usize)> {
2915    let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2916        PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2917    })?;
2918
2919    let result = write_zsets(url, keys, members_scores, ttl, mode)
2920        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2921
2922    Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2923}
2924
2925#[cfg(feature = "python")]
2926/// Python wrapper for StreamBatchIterator.
2927///
2928/// This class is used by the Python IO plugin to iterate over Redis Stream data
2929/// and yield Arrow RecordBatches.
2930#[pyclass]
2931pub struct PyStreamBatchIterator {
2932    inner: StreamBatchIterator,
2933}
2934
2935#[cfg(feature = "python")]
2936#[pymethods]
2937impl PyStreamBatchIterator {
2938    /// Create a new PyStreamBatchIterator.
2939    ///
2940    /// # Arguments
2941    /// * `url` - Redis connection URL
2942    /// * `pattern` - Key pattern to match
2943    /// * `fields` - List of field names to extract from entries
2944    /// * `batch_size` - Keys per batch
2945    /// * `count_hint` - SCAN COUNT hint
2946    /// * `start_id` - Start ID for XRANGE (default: "-" for oldest)
2947    /// * `end_id` - End ID for XRANGE (default: "+" for newest)
2948    /// * `count_per_stream` - Max entries per stream (optional)
2949    /// * `include_key` - Whether to include the Redis key as a column
2950    /// * `key_column_name` - Name of the key column
2951    /// * `include_id` - Whether to include the entry ID as a column
2952    /// * `id_column_name` - Name of the entry ID column
2953    /// * `include_timestamp` - Whether to include the timestamp as a column
2954    /// * `timestamp_column_name` - Name of the timestamp column
2955    /// * `include_sequence` - Whether to include the sequence as a column
2956    /// * `sequence_column_name` - Name of the sequence column
2957    /// * `include_row_index` - Whether to include the row index as a column
2958    /// * `row_index_column_name` - Name of the row index column
2959    /// * `max_rows` - Optional maximum rows to return
2960    #[new]
2961    #[pyo3(signature = (
2962        url,
2963        pattern,
2964        fields = vec![],
2965        batch_size = 1000,
2966        count_hint = 100,
2967        start_id = "-".to_string(),
2968        end_id = "+".to_string(),
2969        count_per_stream = None,
2970        include_key = true,
2971        key_column_name = "_key".to_string(),
2972        include_id = true,
2973        id_column_name = "_id".to_string(),
2974        include_timestamp = true,
2975        timestamp_column_name = "_ts".to_string(),
2976        include_sequence = false,
2977        sequence_column_name = "_seq".to_string(),
2978        include_row_index = false,
2979        row_index_column_name = "_index".to_string(),
2980        max_rows = None
2981    ))]
2982    #[allow(clippy::too_many_arguments)]
2983    fn new(
2984        url: String,
2985        pattern: String,
2986        fields: Vec<String>,
2987        batch_size: usize,
2988        count_hint: usize,
2989        start_id: String,
2990        end_id: String,
2991        count_per_stream: Option<usize>,
2992        include_key: bool,
2993        key_column_name: String,
2994        include_id: bool,
2995        id_column_name: String,
2996        include_timestamp: bool,
2997        timestamp_column_name: String,
2998        include_sequence: bool,
2999        sequence_column_name: String,
3000        include_row_index: bool,
3001        row_index_column_name: String,
3002        max_rows: Option<usize>,
3003    ) -> PyResult<Self> {
3004        let stream_schema = StreamSchema::new()
3005            .with_key(include_key)
3006            .with_key_column_name(&key_column_name)
3007            .with_id(include_id)
3008            .with_id_column_name(&id_column_name)
3009            .with_timestamp(include_timestamp)
3010            .with_timestamp_column_name(&timestamp_column_name)
3011            .with_sequence(include_sequence)
3012            .with_sequence_column_name(&sequence_column_name)
3013            .with_row_index(include_row_index)
3014            .with_row_index_column_name(&row_index_column_name)
3015            .set_fields(fields);
3016
3017        let mut config = types::hash::BatchConfig::new(pattern)
3018            .with_batch_size(batch_size)
3019            .with_count_hint(count_hint);
3020
3021        if let Some(max) = max_rows {
3022            config = config.with_max_rows(max);
3023        }
3024
3025        let mut inner = StreamBatchIterator::new(&url, stream_schema, config)
3026            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
3027
3028        inner = inner.with_start_id(&start_id).with_end_id(&end_id);
3029
3030        if let Some(count) = count_per_stream {
3031            inner = inner.with_count_per_stream(count);
3032        }
3033
3034        Ok(Self { inner })
3035    }
3036
3037    /// Get the next batch as Arrow IPC bytes.
3038    ///
3039    /// Returns None when iteration is complete.
3040    fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
3041        let batch = self
3042            .inner
3043            .next_batch()
3044            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
3045
3046        match batch {
3047            Some(record_batch) => {
3048                let mut buf = Vec::new();
3049                {
3050                    let mut writer = arrow::ipc::writer::FileWriter::try_new(
3051                        &mut buf,
3052                        record_batch.schema().as_ref(),
3053                    )
3054                    .map_err(|e| {
3055                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
3056                            "Failed to create IPC writer: {}",
3057                            e
3058                        ))
3059                    })?;
3060
3061                    writer.write(&record_batch).map_err(|e| {
3062                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
3063                            "Failed to write batch: {}",
3064                            e
3065                        ))
3066                    })?;
3067
3068                    writer.finish().map_err(|e| {
3069                        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
3070                            "Failed to finish IPC: {}",
3071                            e
3072                        ))
3073                    })?;
3074                }
3075
3076                Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
3077            }
3078            None => Ok(None),
3079        }
3080    }
3081
3082    /// Check if iteration is complete.
3083    fn is_done(&self) -> bool {
3084        self.inner.is_done()
3085    }
3086
3087    /// Get the number of rows yielded so far.
3088    fn rows_yielded(&self) -> usize {
3089        self.inner.rows_yielded()
3090    }
3091}