1#[cfg(feature = "python")]
112use arrow::datatypes::DataType;
113#[cfg(feature = "python")]
114use pyo3::prelude::*;
115#[cfg(feature = "python")]
116use std::collections::HashMap;
117
118pub mod cache;
119#[cfg(feature = "cluster")]
120pub mod cluster;
121mod connection;
122mod error;
123mod infer;
124pub mod options;
125pub mod parallel;
126pub mod pubsub;
127#[cfg(feature = "search")]
128pub mod query_builder;
129mod scanner;
130mod schema;
131#[cfg(feature = "search")]
132pub mod search;
133mod types;
134mod write;
135
136#[cfg(feature = "cluster")]
137pub use cluster::{ClusterKeyScanner, DirectClusterKeyScanner};
138pub use connection::{ConnectionConfig, RedisConn, RedisConnection};
139pub use error::{Error, Result};
140pub use infer::{
141 FieldInferenceInfo, InferredSchema, InferredSchemaWithConfidence, infer_hash_schema,
142 infer_hash_schema_with_confidence, infer_json_schema,
143};
144pub use options::{
145 HashScanOptions, JsonScanOptions, KeyColumn, ParallelStrategy, RowIndex, RowIndexColumn,
146 ScanOptions, StreamScanOptions, StringScanOptions, TimeSeriesScanOptions, TtlColumn,
147 get_default_batch_size, get_default_count_hint, get_default_timeout_ms,
148};
149pub use parallel::{FetchResult, KeyBatch, ParallelConfig, ParallelFetch};
150#[cfg(feature = "search")]
151pub use query_builder::{Predicate, PredicateBuilder, Value};
152pub use schema::{HashSchema, RedisType};
153pub use types::hash::{BatchConfig, HashBatchIterator, HashFetcher};
154#[cfg(feature = "cluster")]
155pub use types::hash::{ClusterHashBatchIterator, ClusterHashFetcher};
156#[cfg(feature = "search")]
157pub use types::hash::{HashSearchIterator, SearchBatchConfig};
158#[cfg(feature = "cluster")]
159pub use types::json::ClusterJsonBatchIterator;
160pub use types::json::{JsonBatchIterator, JsonSchema};
161#[cfg(feature = "cluster")]
162pub use types::list::ClusterListBatchIterator;
163pub use types::list::{ListBatchIterator, ListSchema};
164#[cfg(feature = "cluster")]
165pub use types::set::ClusterSetBatchIterator;
166pub use types::set::{SetBatchIterator, SetSchema};
167#[cfg(feature = "cluster")]
168pub use types::stream::ClusterStreamBatchIterator;
169pub use types::stream::{StreamBatchIterator, StreamSchema};
170#[cfg(feature = "cluster")]
171pub use types::string::ClusterStringBatchIterator;
172pub use types::string::{StringBatchIterator, StringSchema};
173#[cfg(feature = "cluster")]
174pub use types::timeseries::ClusterTimeSeriesBatchIterator;
175pub use types::timeseries::{TimeSeriesBatchIterator, TimeSeriesSchema};
176#[cfg(feature = "cluster")]
177pub use types::zset::ClusterZSetBatchIterator;
178pub use types::zset::{ZSetBatchIterator, ZSetSchema};
179pub use write::{
180 KeyError, WriteMode, WriteResult, WriteResultDetailed, write_hashes, write_hashes_detailed,
181 write_json, write_lists, write_sets, write_strings, write_zsets,
182};
183
184pub fn batch_to_ipc(batch: &arrow::array::RecordBatch) -> Result<Vec<u8>> {
195 let mut buf = Vec::new();
196 {
197 let mut writer = arrow::ipc::writer::FileWriter::try_new(&mut buf, batch.schema().as_ref())
198 .map_err(|e| Error::Runtime(format!("Failed to create IPC writer: {}", e)))?;
199
200 writer
201 .write(batch)
202 .map_err(|e| Error::Runtime(format!("Failed to write batch: {}", e)))?;
203
204 writer
205 .finish()
206 .map_err(|e| Error::Runtime(format!("Failed to finish IPC: {}", e)))?;
207 }
208 Ok(buf)
209}
210
211#[cfg(feature = "python")]
216#[pymodule(name = "_internal")]
218fn polars_redis_internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
219 m.add_class::<RedisScanner>()?;
220 m.add_class::<PyHashBatchIterator>()?;
221 m.add_class::<PyJsonBatchIterator>()?;
222 m.add_class::<PyStringBatchIterator>()?;
223 m.add_class::<PySetBatchIterator>()?;
224 m.add_class::<PyListBatchIterator>()?;
225 m.add_class::<PyZSetBatchIterator>()?;
226 m.add_class::<PyStreamBatchIterator>()?;
227 m.add_class::<PyTimeSeriesBatchIterator>()?;
228 #[cfg(feature = "search")]
229 m.add_class::<PyHashSearchIterator>()?;
230 #[cfg(feature = "search")]
231 m.add_function(wrap_pyfunction!(py_aggregate, m)?)?;
232 m.add_function(wrap_pyfunction!(scan_keys, m)?)?;
233 m.add_function(wrap_pyfunction!(py_infer_hash_schema, m)?)?;
234 m.add_function(wrap_pyfunction!(py_infer_json_schema, m)?)?;
235 m.add_function(wrap_pyfunction!(py_infer_hash_schema_with_overwrite, m)?)?;
236 m.add_function(wrap_pyfunction!(py_infer_hash_schema_with_confidence, m)?)?;
237 m.add_function(wrap_pyfunction!(py_infer_json_schema_with_overwrite, m)?)?;
238 m.add_function(wrap_pyfunction!(py_write_hashes, m)?)?;
239 m.add_function(wrap_pyfunction!(py_write_hashes_detailed, m)?)?;
240 m.add_function(wrap_pyfunction!(py_write_json, m)?)?;
241 m.add_function(wrap_pyfunction!(py_write_strings, m)?)?;
242 m.add_function(wrap_pyfunction!(py_write_sets, m)?)?;
243 m.add_function(wrap_pyfunction!(py_write_lists, m)?)?;
244 m.add_function(wrap_pyfunction!(py_write_zsets, m)?)?;
245 m.add_function(wrap_pyfunction!(py_cache_set, m)?)?;
246 m.add_function(wrap_pyfunction!(py_cache_get, m)?)?;
247 m.add_function(wrap_pyfunction!(py_cache_delete, m)?)?;
248 m.add_function(wrap_pyfunction!(py_cache_exists, m)?)?;
249 m.add_function(wrap_pyfunction!(py_cache_ttl, m)?)?;
250 #[cfg(feature = "cluster")]
251 m.add_class::<PyClusterHashBatchIterator>()?;
252 #[cfg(feature = "cluster")]
253 m.add_class::<PyClusterJsonBatchIterator>()?;
254 #[cfg(feature = "cluster")]
255 m.add_class::<PyClusterStringBatchIterator>()?;
256 Ok(())
257}
258
259#[cfg(feature = "python")]
260#[pyclass]
262pub struct RedisScanner {
263 connection_url: String,
264 pattern: String,
265 batch_size: usize,
266 count_hint: usize,
267}
268
269#[cfg(feature = "python")]
270#[pymethods]
271impl RedisScanner {
272 #[new]
274 #[pyo3(signature = (connection_url, pattern, batch_size = 1000, count_hint = 100))]
275 fn new(connection_url: String, pattern: String, batch_size: usize, count_hint: usize) -> Self {
276 Self {
277 connection_url,
278 pattern,
279 batch_size,
280 count_hint,
281 }
282 }
283
284 #[getter]
285 fn connection_url(&self) -> &str {
286 &self.connection_url
287 }
288
289 #[getter]
290 fn pattern(&self) -> &str {
291 &self.pattern
292 }
293
294 #[getter]
295 fn batch_size(&self) -> usize {
296 self.batch_size
297 }
298
299 #[getter]
300 fn count_hint(&self) -> usize {
301 self.count_hint
302 }
303}
304
305#[cfg(feature = "python")]
306#[pyclass]
311pub struct PyHashBatchIterator {
312 inner: HashBatchIterator,
313}
314
315#[cfg(feature = "python")]
316#[pymethods]
317impl PyHashBatchIterator {
318 #[new]
336 #[pyo3(signature = (
337 url,
338 pattern,
339 schema,
340 batch_size = 1000,
341 count_hint = 100,
342 projection = None,
343 include_key = true,
344 key_column_name = "_key".to_string(),
345 include_ttl = false,
346 ttl_column_name = "_ttl".to_string(),
347 include_row_index = false,
348 row_index_column_name = "_index".to_string(),
349 max_rows = None,
350 parallel = None
351 ))]
352 #[allow(clippy::too_many_arguments)]
353 fn new(
354 url: String,
355 pattern: String,
356 schema: Vec<(String, String)>,
357 batch_size: usize,
358 count_hint: usize,
359 projection: Option<Vec<String>>,
360 include_key: bool,
361 key_column_name: String,
362 include_ttl: bool,
363 ttl_column_name: String,
364 include_row_index: bool,
365 row_index_column_name: String,
366 max_rows: Option<usize>,
367 parallel: Option<usize>,
368 ) -> PyResult<Self> {
369 let field_types: Vec<(String, RedisType)> = schema
371 .into_iter()
372 .map(|(name, type_str)| {
373 let redis_type = match type_str.to_lowercase().as_str() {
374 "utf8" | "str" | "string" => RedisType::Utf8,
375 "int64" | "int" | "integer" => RedisType::Int64,
376 "float64" | "float" | "double" => RedisType::Float64,
377 "bool" | "boolean" => RedisType::Boolean,
378 _ => {
379 return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
380 "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
381 type_str, name
382 )));
383 }
384 };
385 Ok((name, redis_type))
386 })
387 .collect::<PyResult<Vec<_>>>()?;
388
389 let hash_schema = HashSchema::new(field_types)
390 .with_key(include_key)
391 .with_key_column_name(key_column_name)
392 .with_ttl(include_ttl)
393 .with_ttl_column_name(ttl_column_name)
394 .with_row_index(include_row_index)
395 .with_row_index_column_name(row_index_column_name);
396
397 let mut config = BatchConfig::new(pattern)
398 .with_batch_size(batch_size)
399 .with_count_hint(count_hint);
400
401 if let Some(max) = max_rows {
402 config = config.with_max_rows(max);
403 }
404
405 if let Some(workers) = parallel {
406 config = config.with_parallel(ParallelStrategy::batches(workers));
407 }
408
409 let inner = HashBatchIterator::new(&url, hash_schema, config, projection)
410 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
411
412 Ok(Self { inner })
413 }
414
415 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
420 let batch = self
421 .inner
422 .next_batch()
423 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
424
425 match batch {
426 Some(record_batch) => {
427 let mut buf = Vec::new();
429 {
430 let mut writer = arrow::ipc::writer::FileWriter::try_new(
431 &mut buf,
432 record_batch.schema().as_ref(),
433 )
434 .map_err(|e| {
435 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
436 "Failed to create IPC writer: {}",
437 e
438 ))
439 })?;
440
441 writer.write(&record_batch).map_err(|e| {
442 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
443 "Failed to write batch: {}",
444 e
445 ))
446 })?;
447
448 writer.finish().map_err(|e| {
449 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
450 "Failed to finish IPC: {}",
451 e
452 ))
453 })?;
454 }
455
456 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
457 }
458 None => Ok(None),
459 }
460 }
461
462 fn is_done(&self) -> bool {
464 self.inner.is_done()
465 }
466
467 fn rows_yielded(&self) -> usize {
469 self.inner.rows_yielded()
470 }
471}
472
473#[cfg(feature = "python")]
478#[pyfunction]
489#[pyo3(signature = (url, key, data, ttl = None))]
490fn py_cache_set(url: &str, key: &str, data: &[u8], ttl: Option<i64>) -> PyResult<usize> {
491 let rt = tokio::runtime::Runtime::new()
492 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
493
494 rt.block_on(async {
495 let client = redis::Client::open(url)
496 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
497
498 let mut conn = client
499 .get_multiplexed_async_connection()
500 .await
501 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
502
503 let len = data.len();
504
505 if let Some(seconds) = ttl {
506 redis::cmd("SETEX")
507 .arg(key)
508 .arg(seconds)
509 .arg(data)
510 .query_async::<()>(&mut conn)
511 .await
512 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
513 } else {
514 redis::cmd("SET")
515 .arg(key)
516 .arg(data)
517 .query_async::<()>(&mut conn)
518 .await
519 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
520 }
521
522 Ok(len)
523 })
524}
525
526#[cfg(feature = "python")]
527#[pyfunction]
536fn py_cache_get(py: Python<'_>, url: &str, key: &str) -> PyResult<Option<Py<PyAny>>> {
537 let rt = tokio::runtime::Runtime::new()
538 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
539
540 rt.block_on(async {
541 let client = redis::Client::open(url)
542 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
543
544 let mut conn = client
545 .get_multiplexed_async_connection()
546 .await
547 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
548
549 let result: Option<Vec<u8>> = redis::cmd("GET")
550 .arg(key)
551 .query_async(&mut conn)
552 .await
553 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
554
555 match result {
556 Some(data) => Ok(Some(pyo3::types::PyBytes::new(py, &data).into())),
557 None => Ok(None),
558 }
559 })
560}
561
562#[cfg(feature = "python")]
563#[pyfunction]
572fn py_cache_delete(url: &str, key: &str) -> PyResult<bool> {
573 let rt = tokio::runtime::Runtime::new()
574 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
575
576 rt.block_on(async {
577 let client = redis::Client::open(url)
578 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
579
580 let mut conn = client
581 .get_multiplexed_async_connection()
582 .await
583 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
584
585 let deleted: i64 = redis::cmd("DEL")
586 .arg(key)
587 .query_async(&mut conn)
588 .await
589 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
590
591 Ok(deleted > 0)
592 })
593}
594
595#[cfg(feature = "python")]
596#[pyfunction]
605fn py_cache_exists(url: &str, key: &str) -> PyResult<bool> {
606 let rt = tokio::runtime::Runtime::new()
607 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
608
609 rt.block_on(async {
610 let client = redis::Client::open(url)
611 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
612
613 let mut conn = client
614 .get_multiplexed_async_connection()
615 .await
616 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
617
618 let exists: i64 = redis::cmd("EXISTS")
619 .arg(key)
620 .query_async(&mut conn)
621 .await
622 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
623
624 Ok(exists > 0)
625 })
626}
627
628#[cfg(feature = "python")]
629#[pyfunction]
638fn py_cache_ttl(url: &str, key: &str) -> PyResult<Option<i64>> {
639 let rt = tokio::runtime::Runtime::new()
640 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
641
642 rt.block_on(async {
643 let client = redis::Client::open(url)
644 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
645
646 let mut conn = client
647 .get_multiplexed_async_connection()
648 .await
649 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
650
651 let ttl: i64 = redis::cmd("TTL")
652 .arg(key)
653 .query_async(&mut conn)
654 .await
655 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
656
657 if ttl < 0 { Ok(None) } else { Ok(Some(ttl)) }
659 })
660}
661
662#[cfg(all(feature = "python", feature = "cluster"))]
667#[pyclass]
672pub struct PyClusterHashBatchIterator {
673 inner: ClusterHashBatchIterator,
674}
675
676#[cfg(all(feature = "python", feature = "cluster"))]
677#[pymethods]
678impl PyClusterHashBatchIterator {
679 #[new]
697 #[pyo3(signature = (
698 nodes,
699 pattern,
700 schema,
701 batch_size = 1000,
702 count_hint = 100,
703 projection = None,
704 include_key = true,
705 key_column_name = "_key".to_string(),
706 include_ttl = false,
707 ttl_column_name = "_ttl".to_string(),
708 include_row_index = false,
709 row_index_column_name = "_index".to_string(),
710 max_rows = None,
711 parallel = None
712 ))]
713 #[allow(clippy::too_many_arguments)]
714 fn new(
715 nodes: Vec<String>,
716 pattern: String,
717 schema: Vec<(String, String)>,
718 batch_size: usize,
719 count_hint: usize,
720 projection: Option<Vec<String>>,
721 include_key: bool,
722 key_column_name: String,
723 include_ttl: bool,
724 ttl_column_name: String,
725 include_row_index: bool,
726 row_index_column_name: String,
727 max_rows: Option<usize>,
728 parallel: Option<usize>,
729 ) -> PyResult<Self> {
730 let field_types: Vec<(String, RedisType)> = schema
731 .into_iter()
732 .map(|(name, type_str)| {
733 let redis_type = match type_str.to_lowercase().as_str() {
734 "utf8" | "str" | "string" => RedisType::Utf8,
735 "int64" | "int" | "integer" => RedisType::Int64,
736 "float64" | "float" | "double" => RedisType::Float64,
737 "bool" | "boolean" => RedisType::Boolean,
738 _ => {
739 return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
740 "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
741 type_str, name
742 )));
743 }
744 };
745 Ok((name, redis_type))
746 })
747 .collect::<PyResult<Vec<_>>>()?;
748
749 let hash_schema = HashSchema::new(field_types)
750 .with_key(include_key)
751 .with_key_column_name(key_column_name)
752 .with_ttl(include_ttl)
753 .with_ttl_column_name(ttl_column_name)
754 .with_row_index(include_row_index)
755 .with_row_index_column_name(row_index_column_name);
756
757 let mut config = BatchConfig::new(pattern)
758 .with_batch_size(batch_size)
759 .with_count_hint(count_hint);
760
761 if let Some(max) = max_rows {
762 config = config.with_max_rows(max);
763 }
764
765 if let Some(workers) = parallel {
766 config = config.with_parallel(ParallelStrategy::batches(workers));
767 }
768
769 let inner = ClusterHashBatchIterator::new(&nodes, hash_schema, config, projection)
770 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
771
772 Ok(Self { inner })
773 }
774
775 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
777 let batch = self
778 .inner
779 .next_batch()
780 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
781
782 match batch {
783 Some(record_batch) => {
784 let mut buf = Vec::new();
785 {
786 let mut writer = arrow::ipc::writer::FileWriter::try_new(
787 &mut buf,
788 record_batch.schema().as_ref(),
789 )
790 .map_err(|e| {
791 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
792 "Failed to create IPC writer: {}",
793 e
794 ))
795 })?;
796
797 writer.write(&record_batch).map_err(|e| {
798 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
799 "Failed to write batch: {}",
800 e
801 ))
802 })?;
803
804 writer.finish().map_err(|e| {
805 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
806 "Failed to finish IPC: {}",
807 e
808 ))
809 })?;
810 }
811
812 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
813 }
814 None => Ok(None),
815 }
816 }
817
818 fn is_done(&self) -> bool {
819 self.inner.is_done()
820 }
821
822 fn rows_yielded(&self) -> usize {
823 self.inner.rows_yielded()
824 }
825
826 fn node_count(&self) -> usize {
827 self.inner.node_count()
828 }
829}
830
831#[cfg(all(feature = "python", feature = "cluster"))]
832#[pyclass]
834pub struct PyClusterJsonBatchIterator {
835 inner: ClusterJsonBatchIterator,
836}
837
838#[cfg(all(feature = "python", feature = "cluster"))]
839#[pymethods]
840impl PyClusterJsonBatchIterator {
841 #[new]
842 #[pyo3(signature = (
843 nodes,
844 pattern,
845 schema,
846 batch_size = 1000,
847 count_hint = 100,
848 projection = None,
849 include_key = true,
850 key_column_name = "_key".to_string(),
851 include_ttl = false,
852 ttl_column_name = "_ttl".to_string(),
853 include_row_index = false,
854 row_index_column_name = "_index".to_string(),
855 max_rows = None,
856 parallel = None
857 ))]
858 #[allow(clippy::too_many_arguments)]
859 fn new(
860 nodes: Vec<String>,
861 pattern: String,
862 schema: Vec<(String, String)>,
863 batch_size: usize,
864 count_hint: usize,
865 projection: Option<Vec<String>>,
866 include_key: bool,
867 key_column_name: String,
868 include_ttl: bool,
869 ttl_column_name: String,
870 include_row_index: bool,
871 row_index_column_name: String,
872 max_rows: Option<usize>,
873 parallel: Option<usize>,
874 ) -> PyResult<Self> {
875 let field_types: Vec<(String, DataType)> = schema
876 .into_iter()
877 .map(|(name, type_str)| {
878 let dtype = match type_str.to_lowercase().as_str() {
879 "utf8" | "str" | "string" => DataType::Utf8,
880 "int64" | "int" | "integer" => DataType::Int64,
881 "float64" | "float" | "double" => DataType::Float64,
882 "bool" | "boolean" => DataType::Boolean,
883 _ => {
884 return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
885 "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
886 type_str, name
887 )));
888 }
889 };
890 Ok((name, dtype))
891 })
892 .collect::<PyResult<Vec<_>>>()?;
893
894 let json_schema = JsonSchema::new(field_types)
895 .with_key(include_key)
896 .with_key_column_name(key_column_name)
897 .with_ttl(include_ttl)
898 .with_ttl_column_name(ttl_column_name)
899 .with_row_index(include_row_index)
900 .with_row_index_column_name(row_index_column_name);
901
902 let mut config = BatchConfig::new(pattern)
903 .with_batch_size(batch_size)
904 .with_count_hint(count_hint);
905
906 if let Some(max) = max_rows {
907 config = config.with_max_rows(max);
908 }
909
910 if let Some(workers) = parallel {
911 config = config.with_parallel(ParallelStrategy::batches(workers));
912 }
913
914 let inner = ClusterJsonBatchIterator::new(&nodes, json_schema, config, projection)
915 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
916
917 Ok(Self { inner })
918 }
919
920 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
921 let batch = self
922 .inner
923 .next_batch()
924 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
925
926 match batch {
927 Some(record_batch) => {
928 let mut buf = Vec::new();
929 {
930 let mut writer = arrow::ipc::writer::FileWriter::try_new(
931 &mut buf,
932 record_batch.schema().as_ref(),
933 )
934 .map_err(|e| {
935 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
936 "Failed to create IPC writer: {}",
937 e
938 ))
939 })?;
940
941 writer.write(&record_batch).map_err(|e| {
942 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
943 "Failed to write batch: {}",
944 e
945 ))
946 })?;
947
948 writer.finish().map_err(|e| {
949 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
950 "Failed to finish IPC: {}",
951 e
952 ))
953 })?;
954 }
955
956 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
957 }
958 None => Ok(None),
959 }
960 }
961
962 fn is_done(&self) -> bool {
963 self.inner.is_done()
964 }
965
966 fn rows_yielded(&self) -> usize {
967 self.inner.rows_yielded()
968 }
969
970 fn node_count(&self) -> usize {
971 self.inner.node_count()
972 }
973}
974
975#[cfg(all(feature = "python", feature = "cluster"))]
976#[pyclass]
978pub struct PyClusterStringBatchIterator {
979 inner: ClusterStringBatchIterator,
980}
981
982#[cfg(all(feature = "python", feature = "cluster"))]
983#[pymethods]
984impl PyClusterStringBatchIterator {
985 #[new]
986 #[pyo3(signature = (
987 nodes,
988 pattern,
989 value_type = "utf8".to_string(),
990 batch_size = 1000,
991 count_hint = 100,
992 include_key = true,
993 key_column_name = "_key".to_string(),
994 value_column_name = "value".to_string(),
995 include_ttl = false,
996 ttl_column_name = "_ttl".to_string(),
997 max_rows = None,
998 parallel = None
999 ))]
1000 #[allow(clippy::too_many_arguments)]
1001 fn new(
1002 nodes: Vec<String>,
1003 pattern: String,
1004 value_type: String,
1005 batch_size: usize,
1006 count_hint: usize,
1007 include_key: bool,
1008 key_column_name: String,
1009 value_column_name: String,
1010 include_ttl: bool,
1011 ttl_column_name: String,
1012 max_rows: Option<usize>,
1013 parallel: Option<usize>,
1014 ) -> PyResult<Self> {
1015 use arrow::datatypes::TimeUnit;
1016
1017 let dtype = match value_type.to_lowercase().as_str() {
1018 "utf8" | "str" | "string" => DataType::Utf8,
1019 "int64" | "int" | "integer" => DataType::Int64,
1020 "float64" | "float" | "double" => DataType::Float64,
1021 "bool" | "boolean" => DataType::Boolean,
1022 "date" => DataType::Date32,
1023 "datetime" => DataType::Timestamp(TimeUnit::Microsecond, None),
1024 _ => {
1025 return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1026 "Unknown value type '{}'. Supported: utf8, int64, float64, bool, date, datetime",
1027 value_type
1028 )));
1029 }
1030 };
1031
1032 let string_schema = StringSchema::new(dtype)
1033 .with_key(include_key)
1034 .with_key_column_name(key_column_name)
1035 .with_value_column_name(value_column_name)
1036 .with_ttl(include_ttl)
1037 .with_ttl_column_name(ttl_column_name);
1038
1039 let mut config = BatchConfig::new(pattern)
1040 .with_batch_size(batch_size)
1041 .with_count_hint(count_hint);
1042
1043 if let Some(max) = max_rows {
1044 config = config.with_max_rows(max);
1045 }
1046
1047 if let Some(workers) = parallel {
1048 config = config.with_parallel(ParallelStrategy::batches(workers));
1049 }
1050
1051 let inner = ClusterStringBatchIterator::new(&nodes, string_schema, config)
1052 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1053
1054 Ok(Self { inner })
1055 }
1056
1057 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1058 let batch = self
1059 .inner
1060 .next_batch()
1061 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1062
1063 match batch {
1064 Some(record_batch) => {
1065 let mut buf = Vec::new();
1066 {
1067 let mut writer = arrow::ipc::writer::FileWriter::try_new(
1068 &mut buf,
1069 record_batch.schema().as_ref(),
1070 )
1071 .map_err(|e| {
1072 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1073 "Failed to create IPC writer: {}",
1074 e
1075 ))
1076 })?;
1077
1078 writer.write(&record_batch).map_err(|e| {
1079 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1080 "Failed to write batch: {}",
1081 e
1082 ))
1083 })?;
1084
1085 writer.finish().map_err(|e| {
1086 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1087 "Failed to finish IPC: {}",
1088 e
1089 ))
1090 })?;
1091 }
1092
1093 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1094 }
1095 None => Ok(None),
1096 }
1097 }
1098
1099 fn is_done(&self) -> bool {
1100 self.inner.is_done()
1101 }
1102
1103 fn rows_yielded(&self) -> usize {
1104 self.inner.rows_yielded()
1105 }
1106
1107 fn node_count(&self) -> usize {
1108 self.inner.node_count()
1109 }
1110}
1111
1112#[cfg(all(feature = "python", feature = "search"))]
1113#[pyclass]
1118pub struct PyHashSearchIterator {
1119 inner: HashSearchIterator,
1120}
1121
1122#[cfg(all(feature = "python", feature = "search"))]
1123#[pymethods]
1124impl PyHashSearchIterator {
1125 #[new]
1144 #[pyo3(signature = (
1145 url,
1146 index,
1147 query,
1148 schema,
1149 batch_size = 1000,
1150 projection = None,
1151 include_key = true,
1152 key_column_name = "_key".to_string(),
1153 include_ttl = false,
1154 ttl_column_name = "_ttl".to_string(),
1155 include_row_index = false,
1156 row_index_column_name = "_index".to_string(),
1157 max_rows = None,
1158 sort_by = None,
1159 sort_ascending = true
1160 ))]
1161 #[allow(clippy::too_many_arguments)]
1162 fn new(
1163 url: String,
1164 index: String,
1165 query: String,
1166 schema: Vec<(String, String)>,
1167 batch_size: usize,
1168 projection: Option<Vec<String>>,
1169 include_key: bool,
1170 key_column_name: String,
1171 include_ttl: bool,
1172 ttl_column_name: String,
1173 include_row_index: bool,
1174 row_index_column_name: String,
1175 max_rows: Option<usize>,
1176 sort_by: Option<String>,
1177 sort_ascending: bool,
1178 ) -> PyResult<Self> {
1179 let field_types: Vec<(String, RedisType)> = schema
1181 .into_iter()
1182 .map(|(name, type_str)| {
1183 let redis_type = match type_str.to_lowercase().as_str() {
1184 "utf8" | "str" | "string" => RedisType::Utf8,
1185 "int64" | "int" | "integer" => RedisType::Int64,
1186 "float64" | "float" | "double" => RedisType::Float64,
1187 "bool" | "boolean" => RedisType::Boolean,
1188 _ => {
1189 return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1190 "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
1191 type_str, name
1192 )));
1193 }
1194 };
1195 Ok((name, redis_type))
1196 })
1197 .collect::<PyResult<Vec<_>>>()?;
1198
1199 let hash_schema = HashSchema::new(field_types)
1200 .with_key(include_key)
1201 .with_key_column_name(key_column_name)
1202 .with_ttl(include_ttl)
1203 .with_ttl_column_name(ttl_column_name)
1204 .with_row_index(include_row_index)
1205 .with_row_index_column_name(row_index_column_name);
1206
1207 let mut config = SearchBatchConfig::new(index, query).with_batch_size(batch_size);
1208
1209 if let Some(max) = max_rows {
1210 config = config.with_max_rows(max);
1211 }
1212
1213 if let Some(field) = sort_by {
1214 config = config.with_sort_by(field, sort_ascending);
1215 }
1216
1217 let inner = HashSearchIterator::new(&url, hash_schema, config, projection)
1218 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1219
1220 Ok(Self { inner })
1221 }
1222
1223 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1228 let batch = self
1229 .inner
1230 .next_batch()
1231 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1232
1233 match batch {
1234 Some(record_batch) => {
1235 let mut buf = Vec::new();
1237 {
1238 let mut writer = arrow::ipc::writer::FileWriter::try_new(
1239 &mut buf,
1240 record_batch.schema().as_ref(),
1241 )
1242 .map_err(|e| {
1243 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1244 "Failed to create IPC writer: {}",
1245 e
1246 ))
1247 })?;
1248
1249 writer.write(&record_batch).map_err(|e| {
1250 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1251 "Failed to write batch: {}",
1252 e
1253 ))
1254 })?;
1255
1256 writer.finish().map_err(|e| {
1257 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1258 "Failed to finish IPC: {}",
1259 e
1260 ))
1261 })?;
1262 }
1263
1264 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1265 }
1266 None => Ok(None),
1267 }
1268 }
1269
1270 fn is_done(&self) -> bool {
1272 self.inner.is_done()
1273 }
1274
1275 fn rows_yielded(&self) -> usize {
1277 self.inner.rows_yielded()
1278 }
1279
1280 fn total_results(&self) -> Option<usize> {
1282 self.inner.total_results()
1283 }
1284}
1285
1286#[cfg(all(feature = "python", feature = "search"))]
1287#[pyfunction]
1320#[pyo3(signature = (
1321 url,
1322 index,
1323 query,
1324 group_by = vec![],
1325 reduce = vec![],
1326 apply = None,
1327 filter = None,
1328 sort_by = None,
1329 limit = None,
1330 offset = 0,
1331 load = None
1332))]
1333#[allow(clippy::too_many_arguments)]
1334fn py_aggregate(
1335 url: &str,
1336 index: &str,
1337 query: &str,
1338 group_by: Vec<String>,
1339 reduce: Vec<(String, Vec<String>, String)>,
1340 apply: Option<Vec<(String, String)>>,
1341 filter: Option<String>,
1342 sort_by: Option<Vec<(String, bool)>>,
1343 limit: Option<usize>,
1344 offset: usize,
1345 load: Option<Vec<String>>,
1346) -> PyResult<Vec<std::collections::HashMap<String, String>>> {
1347 use crate::connection::RedisConnection;
1348 use crate::search::{AggregateConfig, ApplyExpr, ReduceOp, SortBy, aggregate};
1349
1350 let rt = tokio::runtime::Runtime::new()
1351 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1352
1353 rt.block_on(async {
1354 let connection = RedisConnection::new(url)
1355 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1356
1357 let mut conn = connection
1358 .get_connection_manager()
1359 .await
1360 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1361
1362 let reduce_ops: Vec<ReduceOp> = reduce
1364 .into_iter()
1365 .map(|(func, args, alias)| ReduceOp::new(func, args, alias))
1366 .collect();
1367
1368 let apply_exprs: Vec<ApplyExpr> = apply
1370 .unwrap_or_default()
1371 .into_iter()
1372 .map(|(expr, alias)| ApplyExpr::new(expr, alias))
1373 .collect();
1374
1375 let sort_specs: Vec<SortBy> = sort_by
1377 .unwrap_or_default()
1378 .into_iter()
1379 .map(|(field, ascending)| {
1380 if ascending {
1381 SortBy::asc(field)
1382 } else {
1383 SortBy::desc(field)
1384 }
1385 })
1386 .collect();
1387
1388 let mut config = AggregateConfig::new(index, query)
1390 .with_group_by(group_by)
1391 .with_reduce(reduce_ops)
1392 .with_apply(apply_exprs)
1393 .with_sort_by(sort_specs)
1394 .with_offset(offset);
1395
1396 if let Some(f) = filter {
1397 config = config.with_filter(f);
1398 }
1399
1400 if let Some(l) = limit {
1401 config = config.with_limit(l);
1402 }
1403
1404 if let Some(fields) = load {
1405 config = config.with_load(fields);
1406 }
1407
1408 let result = aggregate(&mut conn, &config)
1410 .await
1411 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1412
1413 Ok(result.rows)
1414 })
1415}
1416
1417#[cfg(feature = "python")]
1418#[pyclass]
1423pub struct PyTimeSeriesBatchIterator {
1424 inner: TimeSeriesBatchIterator,
1425}
1426
1427#[cfg(feature = "python")]
1428#[pymethods]
1429impl PyTimeSeriesBatchIterator {
1430 #[new]
1452 #[pyo3(signature = (
1453 url,
1454 pattern,
1455 batch_size = 1000,
1456 count_hint = 100,
1457 start = "-".to_string(),
1458 end = "+".to_string(),
1459 count_per_series = None,
1460 aggregation = None,
1461 bucket_size_ms = None,
1462 include_key = true,
1463 key_column_name = "_key".to_string(),
1464 include_timestamp = true,
1465 timestamp_column_name = "_ts".to_string(),
1466 value_column_name = "value".to_string(),
1467 include_row_index = false,
1468 row_index_column_name = "_index".to_string(),
1469 label_columns = vec![],
1470 max_rows = None
1471 ))]
1472 #[allow(clippy::too_many_arguments)]
1473 fn new(
1474 url: String,
1475 pattern: String,
1476 batch_size: usize,
1477 count_hint: usize,
1478 start: String,
1479 end: String,
1480 count_per_series: Option<usize>,
1481 aggregation: Option<String>,
1482 bucket_size_ms: Option<i64>,
1483 include_key: bool,
1484 key_column_name: String,
1485 include_timestamp: bool,
1486 timestamp_column_name: String,
1487 value_column_name: String,
1488 include_row_index: bool,
1489 row_index_column_name: String,
1490 label_columns: Vec<String>,
1491 max_rows: Option<usize>,
1492 ) -> PyResult<Self> {
1493 let ts_schema = TimeSeriesSchema::new()
1494 .with_key(include_key)
1495 .with_key_column_name(&key_column_name)
1496 .with_timestamp(include_timestamp)
1497 .with_timestamp_column_name(×tamp_column_name)
1498 .with_value_column_name(&value_column_name)
1499 .with_row_index(include_row_index)
1500 .with_row_index_column_name(&row_index_column_name)
1501 .with_label_columns(label_columns);
1502
1503 let mut config = types::hash::BatchConfig::new(pattern)
1504 .with_batch_size(batch_size)
1505 .with_count_hint(count_hint);
1506
1507 if let Some(max) = max_rows {
1508 config = config.with_max_rows(max);
1509 }
1510
1511 let mut inner = TimeSeriesBatchIterator::new(&url, ts_schema, config)
1512 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1513
1514 inner = inner.with_start(&start).with_end(&end);
1515
1516 if let Some(count) = count_per_series {
1517 inner = inner.with_count_per_series(count);
1518 }
1519
1520 if let (Some(agg), Some(bucket)) = (aggregation, bucket_size_ms) {
1521 inner = inner.with_aggregation(&agg, bucket);
1522 }
1523
1524 Ok(Self { inner })
1525 }
1526
1527 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1531 let batch = self
1532 .inner
1533 .next_batch()
1534 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1535
1536 match batch {
1537 Some(record_batch) => {
1538 let mut buf = Vec::new();
1539 {
1540 let mut writer = arrow::ipc::writer::FileWriter::try_new(
1541 &mut buf,
1542 record_batch.schema().as_ref(),
1543 )
1544 .map_err(|e| {
1545 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1546 "Failed to create IPC writer: {}",
1547 e
1548 ))
1549 })?;
1550
1551 writer.write(&record_batch).map_err(|e| {
1552 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1553 "Failed to write batch: {}",
1554 e
1555 ))
1556 })?;
1557
1558 writer.finish().map_err(|e| {
1559 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1560 "Failed to finish IPC: {}",
1561 e
1562 ))
1563 })?;
1564 }
1565
1566 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1567 }
1568 None => Ok(None),
1569 }
1570 }
1571
1572 fn is_done(&self) -> bool {
1574 self.inner.is_done()
1575 }
1576
1577 fn rows_yielded(&self) -> usize {
1579 self.inner.rows_yielded()
1580 }
1581}
1582
1583#[cfg(feature = "python")]
1584#[pyclass]
1589pub struct PyJsonBatchIterator {
1590 inner: JsonBatchIterator,
1591}
1592
1593#[cfg(feature = "python")]
1594#[pymethods]
1595impl PyJsonBatchIterator {
1596 #[new]
1614 #[pyo3(signature = (
1615 url,
1616 pattern,
1617 schema,
1618 batch_size = 1000,
1619 count_hint = 100,
1620 projection = None,
1621 include_key = true,
1622 key_column_name = "_key".to_string(),
1623 include_ttl = false,
1624 ttl_column_name = "_ttl".to_string(),
1625 include_row_index = false,
1626 row_index_column_name = "_index".to_string(),
1627 max_rows = None,
1628 parallel = None
1629 ))]
1630 #[allow(clippy::too_many_arguments)]
1631 fn new(
1632 url: String,
1633 pattern: String,
1634 schema: Vec<(String, String)>,
1635 batch_size: usize,
1636 count_hint: usize,
1637 projection: Option<Vec<String>>,
1638 include_key: bool,
1639 key_column_name: String,
1640 include_ttl: bool,
1641 ttl_column_name: String,
1642 include_row_index: bool,
1643 row_index_column_name: String,
1644 max_rows: Option<usize>,
1645 parallel: Option<usize>,
1646 ) -> PyResult<Self> {
1647 let field_types: Vec<(String, DataType)> = schema
1649 .into_iter()
1650 .map(|(name, type_str)| {
1651 let dtype = match type_str.to_lowercase().as_str() {
1652 "utf8" | "str" | "string" => DataType::Utf8,
1653 "int64" | "int" | "integer" => DataType::Int64,
1654 "float64" | "float" | "double" => DataType::Float64,
1655 "bool" | "boolean" => DataType::Boolean,
1656 _ => {
1657 return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1658 "Unknown type '{}' for field '{}'. Supported: utf8, int64, float64, bool",
1659 type_str, name
1660 )));
1661 }
1662 };
1663 Ok((name, dtype))
1664 })
1665 .collect::<PyResult<Vec<_>>>()?;
1666
1667 let json_schema = JsonSchema::new(field_types)
1668 .with_key(include_key)
1669 .with_key_column_name(key_column_name)
1670 .with_ttl(include_ttl)
1671 .with_ttl_column_name(ttl_column_name)
1672 .with_row_index(include_row_index)
1673 .with_row_index_column_name(row_index_column_name);
1674
1675 let mut config = BatchConfig::new(pattern)
1676 .with_batch_size(batch_size)
1677 .with_count_hint(count_hint);
1678
1679 if let Some(max) = max_rows {
1680 config = config.with_max_rows(max);
1681 }
1682
1683 if let Some(workers) = parallel {
1684 config = config.with_parallel(ParallelStrategy::batches(workers));
1685 }
1686
1687 let inner = JsonBatchIterator::new(&url, json_schema, config, projection)
1688 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1689
1690 Ok(Self { inner })
1691 }
1692
1693 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1698 let batch = self
1699 .inner
1700 .next_batch()
1701 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1702
1703 match batch {
1704 Some(record_batch) => {
1705 let mut buf = Vec::new();
1707 {
1708 let mut writer = arrow::ipc::writer::FileWriter::try_new(
1709 &mut buf,
1710 record_batch.schema().as_ref(),
1711 )
1712 .map_err(|e| {
1713 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1714 "Failed to create IPC writer: {}",
1715 e
1716 ))
1717 })?;
1718
1719 writer.write(&record_batch).map_err(|e| {
1720 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1721 "Failed to write batch: {}",
1722 e
1723 ))
1724 })?;
1725
1726 writer.finish().map_err(|e| {
1727 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1728 "Failed to finish IPC: {}",
1729 e
1730 ))
1731 })?;
1732 }
1733
1734 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1735 }
1736 None => Ok(None),
1737 }
1738 }
1739
1740 fn is_done(&self) -> bool {
1742 self.inner.is_done()
1743 }
1744
1745 fn rows_yielded(&self) -> usize {
1747 self.inner.rows_yielded()
1748 }
1749}
1750
1751#[cfg(feature = "python")]
1752#[pyclass]
1757pub struct PyStringBatchIterator {
1758 inner: StringBatchIterator,
1759}
1760
1761#[cfg(feature = "python")]
1762#[pymethods]
1763impl PyStringBatchIterator {
1764 #[new]
1780 #[pyo3(signature = (
1781 url,
1782 pattern,
1783 value_type = "utf8".to_string(),
1784 batch_size = 1000,
1785 count_hint = 100,
1786 include_key = true,
1787 key_column_name = "_key".to_string(),
1788 value_column_name = "value".to_string(),
1789 include_ttl = false,
1790 ttl_column_name = "_ttl".to_string(),
1791 max_rows = None,
1792 parallel = None
1793 ))]
1794 #[allow(clippy::too_many_arguments)]
1795 fn new(
1796 url: String,
1797 pattern: String,
1798 value_type: String,
1799 batch_size: usize,
1800 count_hint: usize,
1801 include_key: bool,
1802 key_column_name: String,
1803 value_column_name: String,
1804 include_ttl: bool,
1805 ttl_column_name: String,
1806 max_rows: Option<usize>,
1807 parallel: Option<usize>,
1808 ) -> PyResult<Self> {
1809 use arrow::datatypes::TimeUnit;
1810
1811 let dtype = match value_type.to_lowercase().as_str() {
1813 "utf8" | "str" | "string" => DataType::Utf8,
1814 "int64" | "int" | "integer" => DataType::Int64,
1815 "float64" | "float" | "double" => DataType::Float64,
1816 "bool" | "boolean" => DataType::Boolean,
1817 "date" => DataType::Date32,
1818 "datetime" => DataType::Timestamp(TimeUnit::Microsecond, None),
1819 _ => {
1820 return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1821 "Unknown value type '{}'. Supported: utf8, int64, float64, bool, date, datetime",
1822 value_type
1823 )));
1824 }
1825 };
1826
1827 let string_schema = StringSchema::new(dtype)
1828 .with_key(include_key)
1829 .with_key_column_name(key_column_name)
1830 .with_value_column_name(value_column_name)
1831 .with_ttl(include_ttl)
1832 .with_ttl_column_name(ttl_column_name);
1833
1834 let mut config = BatchConfig::new(pattern)
1835 .with_batch_size(batch_size)
1836 .with_count_hint(count_hint);
1837
1838 if let Some(max) = max_rows {
1839 config = config.with_max_rows(max);
1840 }
1841
1842 if let Some(workers) = parallel {
1843 config = config.with_parallel(ParallelStrategy::batches(workers));
1844 }
1845
1846 let inner = StringBatchIterator::new(&url, string_schema, config)
1847 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1848
1849 Ok(Self { inner })
1850 }
1851
1852 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
1857 let batch = self
1858 .inner
1859 .next_batch()
1860 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1861
1862 match batch {
1863 Some(record_batch) => {
1864 let mut buf = Vec::new();
1866 {
1867 let mut writer = arrow::ipc::writer::FileWriter::try_new(
1868 &mut buf,
1869 record_batch.schema().as_ref(),
1870 )
1871 .map_err(|e| {
1872 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1873 "Failed to create IPC writer: {}",
1874 e
1875 ))
1876 })?;
1877
1878 writer.write(&record_batch).map_err(|e| {
1879 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1880 "Failed to write batch: {}",
1881 e
1882 ))
1883 })?;
1884
1885 writer.finish().map_err(|e| {
1886 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
1887 "Failed to finish IPC: {}",
1888 e
1889 ))
1890 })?;
1891 }
1892
1893 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
1894 }
1895 None => Ok(None),
1896 }
1897 }
1898
1899 fn is_done(&self) -> bool {
1901 self.inner.is_done()
1902 }
1903
1904 fn rows_yielded(&self) -> usize {
1906 self.inner.rows_yielded()
1907 }
1908}
1909
1910#[cfg(feature = "python")]
1911#[pyfunction]
1913#[pyo3(signature = (connection_url, pattern, count = 10))]
1914fn scan_keys(connection_url: &str, pattern: &str, count: usize) -> PyResult<Vec<String>> {
1915 let rt = tokio::runtime::Runtime::new()
1916 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1917
1918 rt.block_on(async {
1919 let client = redis::Client::open(connection_url)
1920 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1921
1922 let mut conn = client
1923 .get_multiplexed_async_connection()
1924 .await
1925 .map_err(|e| PyErr::new::<pyo3::exceptions::PyConnectionError, _>(e.to_string()))?;
1926
1927 let mut keys: Vec<String> = Vec::new();
1928 let mut cursor = 0u64;
1929
1930 loop {
1931 let (new_cursor, batch): (u64, Vec<String>) = redis::cmd("SCAN")
1932 .arg(cursor)
1933 .arg("MATCH")
1934 .arg(pattern)
1935 .arg("COUNT")
1936 .arg(count)
1937 .query_async(&mut conn)
1938 .await
1939 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1940
1941 keys.extend(batch);
1942 cursor = new_cursor;
1943
1944 if cursor == 0 || keys.len() >= count {
1945 break;
1946 }
1947 }
1948
1949 keys.truncate(count);
1950 Ok(keys)
1951 })
1952}
1953
1954#[cfg(feature = "python")]
1955#[pyfunction]
1966#[pyo3(signature = (url, pattern, sample_size = 100, type_inference = true))]
1967fn py_infer_hash_schema(
1968 url: &str,
1969 pattern: &str,
1970 sample_size: usize,
1971 type_inference: bool,
1972) -> PyResult<(Vec<(String, String)>, usize)> {
1973 let schema = infer_hash_schema(url, pattern, sample_size, type_inference)
1974 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1975
1976 Ok((schema.to_type_strings(), schema.sample_count))
1977}
1978
1979#[cfg(feature = "python")]
1980#[pyfunction]
1990#[pyo3(signature = (url, pattern, sample_size = 100))]
1991fn py_infer_json_schema(
1992 url: &str,
1993 pattern: &str,
1994 sample_size: usize,
1995) -> PyResult<(Vec<(String, String)>, usize)> {
1996 let schema = infer_json_schema(url, pattern, sample_size)
1997 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1998
1999 Ok((schema.to_type_strings(), schema.sample_count))
2000}
2001
2002#[cfg(feature = "python")]
2003#[pyfunction]
2029#[pyo3(signature = (url, pattern, schema_overwrite = None, sample_size = 100, type_inference = true))]
2030fn py_infer_hash_schema_with_overwrite(
2031 url: &str,
2032 pattern: &str,
2033 schema_overwrite: Option<Vec<(String, String)>>,
2034 sample_size: usize,
2035 type_inference: bool,
2036) -> PyResult<(Vec<(String, String)>, usize)> {
2037 use crate::schema::RedisType;
2038
2039 let schema = infer_hash_schema(url, pattern, sample_size, type_inference)
2040 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2041
2042 let final_schema = if let Some(overwrite) = schema_overwrite {
2044 let overwrite_typed: Vec<(String, RedisType)> = overwrite
2045 .into_iter()
2046 .map(|(name, type_str)| {
2047 let redis_type = match type_str.as_str() {
2048 "utf8" | "string" => RedisType::Utf8,
2049 "int64" | "integer" => RedisType::Int64,
2050 "float64" | "float" => RedisType::Float64,
2051 "bool" | "boolean" => RedisType::Boolean,
2052 "date" => RedisType::Date,
2053 "datetime" => RedisType::Datetime,
2054 _ => RedisType::Utf8,
2055 };
2056 (name, redis_type)
2057 })
2058 .collect();
2059 schema.with_overwrite(&overwrite_typed)
2060 } else {
2061 schema
2062 };
2063
2064 Ok((final_schema.to_type_strings(), final_schema.sample_count))
2065}
2066
2067#[cfg(feature = "python")]
2068#[pyfunction]
2079#[pyo3(signature = (url, pattern, schema_overwrite = None, sample_size = 100))]
2080fn py_infer_json_schema_with_overwrite(
2081 url: &str,
2082 pattern: &str,
2083 schema_overwrite: Option<Vec<(String, String)>>,
2084 sample_size: usize,
2085) -> PyResult<(Vec<(String, String)>, usize)> {
2086 use crate::schema::RedisType;
2087
2088 let schema = infer_json_schema(url, pattern, sample_size)
2089 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2090
2091 let final_schema = if let Some(overwrite) = schema_overwrite {
2093 let overwrite_typed: Vec<(String, RedisType)> = overwrite
2094 .into_iter()
2095 .map(|(name, type_str)| {
2096 let redis_type = match type_str.as_str() {
2097 "utf8" | "string" => RedisType::Utf8,
2098 "int64" | "integer" => RedisType::Int64,
2099 "float64" | "float" => RedisType::Float64,
2100 "bool" | "boolean" => RedisType::Boolean,
2101 "date" => RedisType::Date,
2102 "datetime" => RedisType::Datetime,
2103 _ => RedisType::Utf8,
2104 };
2105 (name, redis_type)
2106 })
2107 .collect();
2108 schema.with_overwrite(&overwrite_typed)
2109 } else {
2110 schema
2111 };
2112
2113 Ok((final_schema.to_type_strings(), final_schema.sample_count))
2114}
2115
2116#[cfg(feature = "python")]
2117#[pyfunction]
2145#[pyo3(signature = (url, pattern, sample_size = 100))]
2146fn py_infer_hash_schema_with_confidence(
2147 py: Python<'_>,
2148 url: &str,
2149 pattern: &str,
2150 sample_size: usize,
2151) -> PyResult<HashMap<String, Py<PyAny>>> {
2152 let schema = infer_hash_schema_with_confidence(url, pattern, sample_size)
2153 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2154
2155 let mut result: HashMap<String, Py<PyAny>> = HashMap::new();
2156
2157 let fields: Vec<(String, String)> = schema
2159 .fields
2160 .iter()
2161 .map(|(name, dtype)| {
2162 let type_str = match dtype {
2163 crate::schema::RedisType::Utf8 => "utf8",
2164 crate::schema::RedisType::Int64 => "int64",
2165 crate::schema::RedisType::Float64 => "float64",
2166 crate::schema::RedisType::Boolean => "bool",
2167 crate::schema::RedisType::Date => "date",
2168 crate::schema::RedisType::Datetime => "datetime",
2169 };
2170 (name.clone(), type_str.to_string())
2171 })
2172 .collect();
2173
2174 result.insert(
2175 "fields".to_string(),
2176 fields.into_pyobject(py)?.into_any().unbind(),
2177 );
2178 result.insert(
2179 "sample_count".to_string(),
2180 schema.sample_count.into_pyobject(py)?.into_any().unbind(),
2181 );
2182
2183 let mut field_info_py: HashMap<String, HashMap<String, Py<PyAny>>> = HashMap::new();
2185 for (name, info) in &schema.field_info {
2186 let mut info_dict: HashMap<String, Py<PyAny>> = HashMap::new();
2187 let type_str = match info.inferred_type {
2188 crate::schema::RedisType::Utf8 => "utf8",
2189 crate::schema::RedisType::Int64 => "int64",
2190 crate::schema::RedisType::Float64 => "float64",
2191 crate::schema::RedisType::Boolean => "bool",
2192 crate::schema::RedisType::Date => "date",
2193 crate::schema::RedisType::Datetime => "datetime",
2194 };
2195 info_dict.insert(
2196 "type".to_string(),
2197 type_str.into_pyobject(py)?.into_any().unbind(),
2198 );
2199 info_dict.insert(
2200 "confidence".to_string(),
2201 info.confidence.into_pyobject(py)?.into_any().unbind(),
2202 );
2203 info_dict.insert(
2204 "samples".to_string(),
2205 info.samples.into_pyobject(py)?.into_any().unbind(),
2206 );
2207 info_dict.insert(
2208 "valid".to_string(),
2209 info.valid.into_pyobject(py)?.into_any().unbind(),
2210 );
2211 info_dict.insert(
2212 "nulls".to_string(),
2213 info.nulls.into_pyobject(py)?.into_any().unbind(),
2214 );
2215 info_dict.insert(
2216 "null_ratio".to_string(),
2217 info.null_ratio().into_pyobject(py)?.into_any().unbind(),
2218 );
2219 info_dict.insert(
2220 "type_candidates".to_string(),
2221 info.type_candidates
2222 .clone()
2223 .into_pyobject(py)?
2224 .into_any()
2225 .unbind(),
2226 );
2227
2228 field_info_py.insert(name.clone(), info_dict);
2229 }
2230 result.insert(
2231 "field_info".to_string(),
2232 field_info_py.into_pyobject(py)?.into_any().unbind(),
2233 );
2234
2235 result.insert(
2237 "average_confidence".to_string(),
2238 schema
2239 .average_confidence()
2240 .into_pyobject(py)?
2241 .into_any()
2242 .unbind(),
2243 );
2244 let all_confident_bool = pyo3::types::PyBool::new(py, schema.all_confident(0.9));
2246 result.insert(
2247 "all_confident".to_string(),
2248 all_confident_bool.to_owned().into_any().unbind(),
2249 );
2250
2251 Ok(result)
2252}
2253
2254#[cfg(feature = "python")]
2255#[pyfunction]
2268#[pyo3(signature = (url, keys, fields, values, ttl = None, if_exists = "replace".to_string()))]
2269fn py_write_hashes(
2270 url: &str,
2271 keys: Vec<String>,
2272 fields: Vec<String>,
2273 values: Vec<Vec<Option<String>>>,
2274 ttl: Option<i64>,
2275 if_exists: String,
2276) -> PyResult<(usize, usize, usize)> {
2277 let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2278 PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2279 })?;
2280
2281 let result = write_hashes(url, keys, fields, values, ttl, mode)
2282 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2283
2284 Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2285}
2286
2287#[cfg(feature = "python")]
2288#[pyfunction]
2311#[pyo3(signature = (url, keys, fields, values, ttl = None, if_exists = "replace".to_string()))]
2312fn py_write_hashes_detailed(
2313 url: &str,
2314 keys: Vec<String>,
2315 fields: Vec<String>,
2316 values: Vec<Vec<Option<String>>>,
2317 ttl: Option<i64>,
2318 if_exists: String,
2319) -> PyResult<std::collections::HashMap<String, Py<PyAny>>> {
2320 use pyo3::IntoPyObjectExt;
2321
2322 let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2323 PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2324 })?;
2325
2326 let result = write_hashes_detailed(url, keys, fields, values, ttl, mode)
2327 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2328
2329 Python::attach(|py| {
2331 let mut dict = std::collections::HashMap::new();
2332
2333 dict.insert(
2334 "keys_written".to_string(),
2335 result.keys_written.into_py_any(py)?,
2336 );
2337 dict.insert(
2338 "keys_failed".to_string(),
2339 result.keys_failed.into_py_any(py)?,
2340 );
2341 dict.insert(
2342 "keys_skipped".to_string(),
2343 result.keys_skipped.into_py_any(py)?,
2344 );
2345 dict.insert(
2346 "succeeded_keys".to_string(),
2347 result.succeeded_keys.into_py_any(py)?,
2348 );
2349
2350 let failed_keys: Vec<String> = result.errors.iter().map(|e| e.key.clone()).collect();
2352 dict.insert("failed_keys".to_string(), failed_keys.into_py_any(py)?);
2353
2354 let errors: std::collections::HashMap<String, String> = result
2355 .errors
2356 .into_iter()
2357 .map(|e| (e.key, e.error))
2358 .collect();
2359 dict.insert("errors".to_string(), errors.into_py_any(py)?);
2360
2361 Ok(dict)
2362 })
2363}
2364
2365#[cfg(feature = "python")]
2366#[pyclass]
2371pub struct PyListBatchIterator {
2372 inner: ListBatchIterator,
2373}
2374
2375#[cfg(feature = "python")]
2376#[pymethods]
2377impl PyListBatchIterator {
2378 #[new]
2394 #[pyo3(signature = (
2395 url,
2396 pattern,
2397 batch_size = 1000,
2398 count_hint = 100,
2399 include_key = true,
2400 key_column_name = "_key".to_string(),
2401 element_column_name = "element".to_string(),
2402 include_position = false,
2403 position_column_name = "position".to_string(),
2404 include_row_index = false,
2405 row_index_column_name = "_index".to_string(),
2406 max_rows = None
2407 ))]
2408 #[allow(clippy::too_many_arguments)]
2409 fn new(
2410 url: String,
2411 pattern: String,
2412 batch_size: usize,
2413 count_hint: usize,
2414 include_key: bool,
2415 key_column_name: String,
2416 element_column_name: String,
2417 include_position: bool,
2418 position_column_name: String,
2419 include_row_index: bool,
2420 row_index_column_name: String,
2421 max_rows: Option<usize>,
2422 ) -> PyResult<Self> {
2423 let list_schema = ListSchema::new()
2424 .with_key(include_key)
2425 .with_key_column_name(&key_column_name)
2426 .with_element_column_name(&element_column_name)
2427 .with_position(include_position)
2428 .with_position_column_name(&position_column_name)
2429 .with_row_index(include_row_index)
2430 .with_row_index_column_name(&row_index_column_name);
2431
2432 let mut config = types::hash::BatchConfig::new(pattern)
2433 .with_batch_size(batch_size)
2434 .with_count_hint(count_hint);
2435
2436 if let Some(max) = max_rows {
2437 config = config.with_max_rows(max);
2438 }
2439
2440 let inner = ListBatchIterator::new(&url, list_schema, config)
2441 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2442
2443 Ok(Self { inner })
2444 }
2445
2446 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
2448 let batch = self
2449 .inner
2450 .next_batch()
2451 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2452
2453 match batch {
2454 Some(record_batch) => {
2455 let mut buf = Vec::new();
2456 {
2457 let mut writer = arrow::ipc::writer::FileWriter::try_new(
2458 &mut buf,
2459 record_batch.schema().as_ref(),
2460 )
2461 .map_err(|e| {
2462 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2463 "Failed to create IPC writer: {}",
2464 e
2465 ))
2466 })?;
2467
2468 writer.write(&record_batch).map_err(|e| {
2469 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2470 "Failed to write batch: {}",
2471 e
2472 ))
2473 })?;
2474
2475 writer.finish().map_err(|e| {
2476 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2477 "Failed to finish IPC: {}",
2478 e
2479 ))
2480 })?;
2481 }
2482
2483 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
2484 }
2485 None => Ok(None),
2486 }
2487 }
2488
2489 fn is_done(&self) -> bool {
2491 self.inner.is_done()
2492 }
2493
2494 fn rows_yielded(&self) -> usize {
2496 self.inner.rows_yielded()
2497 }
2498}
2499
2500#[cfg(feature = "python")]
2501#[pyfunction]
2513#[pyo3(signature = (url, keys, elements, ttl = None, if_exists = "replace".to_string()))]
2514fn py_write_lists(
2515 url: &str,
2516 keys: Vec<String>,
2517 elements: Vec<Vec<String>>,
2518 ttl: Option<i64>,
2519 if_exists: String,
2520) -> PyResult<(usize, usize, usize)> {
2521 let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2522 PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2523 })?;
2524
2525 let result = write_lists(url, keys, elements, ttl, mode)
2526 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2527
2528 Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2529}
2530
2531#[cfg(feature = "python")]
2532#[pyfunction]
2544#[pyo3(signature = (url, keys, json_strings, ttl = None, if_exists = "replace".to_string()))]
2545fn py_write_json(
2546 url: &str,
2547 keys: Vec<String>,
2548 json_strings: Vec<String>,
2549 ttl: Option<i64>,
2550 if_exists: String,
2551) -> PyResult<(usize, usize, usize)> {
2552 let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2553 PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2554 })?;
2555
2556 let result = write_json(url, keys, json_strings, ttl, mode)
2557 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2558
2559 Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2560}
2561
2562#[cfg(feature = "python")]
2563#[pyfunction]
2575#[pyo3(signature = (url, keys, values, ttl = None, if_exists = "replace".to_string()))]
2576fn py_write_strings(
2577 url: &str,
2578 keys: Vec<String>,
2579 values: Vec<Option<String>>,
2580 ttl: Option<i64>,
2581 if_exists: String,
2582) -> PyResult<(usize, usize, usize)> {
2583 let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2584 PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2585 })?;
2586
2587 let result = write_strings(url, keys, values, ttl, mode)
2588 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2589
2590 Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2591}
2592
2593#[cfg(feature = "python")]
2594#[pyclass]
2599pub struct PySetBatchIterator {
2600 inner: SetBatchIterator,
2601}
2602
2603#[cfg(feature = "python")]
2604#[pymethods]
2605impl PySetBatchIterator {
2606 #[new]
2620 #[pyo3(signature = (
2621 url,
2622 pattern,
2623 batch_size = 1000,
2624 count_hint = 100,
2625 include_key = true,
2626 key_column_name = "_key".to_string(),
2627 member_column_name = "member".to_string(),
2628 include_row_index = false,
2629 row_index_column_name = "_index".to_string(),
2630 max_rows = None
2631 ))]
2632 #[allow(clippy::too_many_arguments)]
2633 fn new(
2634 url: String,
2635 pattern: String,
2636 batch_size: usize,
2637 count_hint: usize,
2638 include_key: bool,
2639 key_column_name: String,
2640 member_column_name: String,
2641 include_row_index: bool,
2642 row_index_column_name: String,
2643 max_rows: Option<usize>,
2644 ) -> PyResult<Self> {
2645 let set_schema = SetSchema::new()
2646 .with_key(include_key)
2647 .with_key_column_name(&key_column_name)
2648 .with_member_column_name(&member_column_name)
2649 .with_row_index(include_row_index)
2650 .with_row_index_column_name(&row_index_column_name);
2651
2652 let mut config = types::hash::BatchConfig::new(pattern)
2653 .with_batch_size(batch_size)
2654 .with_count_hint(count_hint);
2655
2656 if let Some(max) = max_rows {
2657 config = config.with_max_rows(max);
2658 }
2659
2660 let inner = SetBatchIterator::new(&url, set_schema, config)
2661 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2662
2663 Ok(Self { inner })
2664 }
2665
2666 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
2670 let batch = self
2671 .inner
2672 .next_batch()
2673 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2674
2675 match batch {
2676 Some(record_batch) => {
2677 let mut buf = Vec::new();
2678 {
2679 let mut writer = arrow::ipc::writer::FileWriter::try_new(
2680 &mut buf,
2681 record_batch.schema().as_ref(),
2682 )
2683 .map_err(|e| {
2684 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2685 "Failed to create IPC writer: {}",
2686 e
2687 ))
2688 })?;
2689
2690 writer.write(&record_batch).map_err(|e| {
2691 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2692 "Failed to write batch: {}",
2693 e
2694 ))
2695 })?;
2696
2697 writer.finish().map_err(|e| {
2698 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2699 "Failed to finish IPC: {}",
2700 e
2701 ))
2702 })?;
2703 }
2704
2705 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
2706 }
2707 None => Ok(None),
2708 }
2709 }
2710
2711 fn is_done(&self) -> bool {
2713 self.inner.is_done()
2714 }
2715
2716 fn rows_yielded(&self) -> usize {
2718 self.inner.rows_yielded()
2719 }
2720}
2721
2722#[cfg(feature = "python")]
2723#[pyfunction]
2735#[pyo3(signature = (url, keys, members, ttl = None, if_exists = "replace".to_string()))]
2736fn py_write_sets(
2737 url: &str,
2738 keys: Vec<String>,
2739 members: Vec<Vec<String>>,
2740 ttl: Option<i64>,
2741 if_exists: String,
2742) -> PyResult<(usize, usize, usize)> {
2743 let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2744 PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2745 })?;
2746
2747 let result = write_sets(url, keys, members, ttl, mode)
2748 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2749
2750 Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2751}
2752
2753#[cfg(feature = "python")]
2754#[pyclass]
2759pub struct PyZSetBatchIterator {
2760 inner: ZSetBatchIterator,
2761}
2762
2763#[cfg(feature = "python")]
2764#[pymethods]
2765impl PyZSetBatchIterator {
2766 #[new]
2783 #[pyo3(signature = (
2784 url,
2785 pattern,
2786 batch_size = 1000,
2787 count_hint = 100,
2788 include_key = true,
2789 key_column_name = "_key".to_string(),
2790 member_column_name = "member".to_string(),
2791 score_column_name = "score".to_string(),
2792 include_rank = false,
2793 rank_column_name = "rank".to_string(),
2794 include_row_index = false,
2795 row_index_column_name = "_index".to_string(),
2796 max_rows = None
2797 ))]
2798 #[allow(clippy::too_many_arguments)]
2799 fn new(
2800 url: String,
2801 pattern: String,
2802 batch_size: usize,
2803 count_hint: usize,
2804 include_key: bool,
2805 key_column_name: String,
2806 member_column_name: String,
2807 score_column_name: String,
2808 include_rank: bool,
2809 rank_column_name: String,
2810 include_row_index: bool,
2811 row_index_column_name: String,
2812 max_rows: Option<usize>,
2813 ) -> PyResult<Self> {
2814 let zset_schema = ZSetSchema::new()
2815 .with_key(include_key)
2816 .with_key_column_name(&key_column_name)
2817 .with_member_column_name(&member_column_name)
2818 .with_score_column_name(&score_column_name)
2819 .with_rank(include_rank)
2820 .with_rank_column_name(&rank_column_name)
2821 .with_row_index(include_row_index)
2822 .with_row_index_column_name(&row_index_column_name);
2823
2824 let mut config = types::hash::BatchConfig::new(pattern)
2825 .with_batch_size(batch_size)
2826 .with_count_hint(count_hint);
2827
2828 if let Some(max) = max_rows {
2829 config = config.with_max_rows(max);
2830 }
2831
2832 let inner = ZSetBatchIterator::new(&url, zset_schema, config)
2833 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2834
2835 Ok(Self { inner })
2836 }
2837
2838 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
2842 let batch = self
2843 .inner
2844 .next_batch()
2845 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2846
2847 match batch {
2848 Some(record_batch) => {
2849 let mut buf = Vec::new();
2850 {
2851 let mut writer = arrow::ipc::writer::FileWriter::try_new(
2852 &mut buf,
2853 record_batch.schema().as_ref(),
2854 )
2855 .map_err(|e| {
2856 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2857 "Failed to create IPC writer: {}",
2858 e
2859 ))
2860 })?;
2861
2862 writer.write(&record_batch).map_err(|e| {
2863 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2864 "Failed to write batch: {}",
2865 e
2866 ))
2867 })?;
2868
2869 writer.finish().map_err(|e| {
2870 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
2871 "Failed to finish IPC: {}",
2872 e
2873 ))
2874 })?;
2875 }
2876
2877 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
2878 }
2879 None => Ok(None),
2880 }
2881 }
2882
2883 fn is_done(&self) -> bool {
2885 self.inner.is_done()
2886 }
2887
2888 fn rows_yielded(&self) -> usize {
2890 self.inner.rows_yielded()
2891 }
2892}
2893
2894#[cfg(feature = "python")]
2895#[pyfunction]
2907#[pyo3(signature = (url, keys, members_scores, ttl = None, if_exists = "replace".to_string()))]
2908fn py_write_zsets(
2909 url: &str,
2910 keys: Vec<String>,
2911 members_scores: Vec<Vec<(String, f64)>>,
2912 ttl: Option<i64>,
2913 if_exists: String,
2914) -> PyResult<(usize, usize, usize)> {
2915 let mode: WriteMode = if_exists.parse().map_err(|e: crate::Error| {
2916 PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string())
2917 })?;
2918
2919 let result = write_zsets(url, keys, members_scores, ttl, mode)
2920 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
2921
2922 Ok((result.keys_written, result.keys_failed, result.keys_skipped))
2923}
2924
2925#[cfg(feature = "python")]
2926#[pyclass]
2931pub struct PyStreamBatchIterator {
2932 inner: StreamBatchIterator,
2933}
2934
2935#[cfg(feature = "python")]
2936#[pymethods]
2937impl PyStreamBatchIterator {
2938 #[new]
2961 #[pyo3(signature = (
2962 url,
2963 pattern,
2964 fields = vec![],
2965 batch_size = 1000,
2966 count_hint = 100,
2967 start_id = "-".to_string(),
2968 end_id = "+".to_string(),
2969 count_per_stream = None,
2970 include_key = true,
2971 key_column_name = "_key".to_string(),
2972 include_id = true,
2973 id_column_name = "_id".to_string(),
2974 include_timestamp = true,
2975 timestamp_column_name = "_ts".to_string(),
2976 include_sequence = false,
2977 sequence_column_name = "_seq".to_string(),
2978 include_row_index = false,
2979 row_index_column_name = "_index".to_string(),
2980 max_rows = None
2981 ))]
2982 #[allow(clippy::too_many_arguments)]
2983 fn new(
2984 url: String,
2985 pattern: String,
2986 fields: Vec<String>,
2987 batch_size: usize,
2988 count_hint: usize,
2989 start_id: String,
2990 end_id: String,
2991 count_per_stream: Option<usize>,
2992 include_key: bool,
2993 key_column_name: String,
2994 include_id: bool,
2995 id_column_name: String,
2996 include_timestamp: bool,
2997 timestamp_column_name: String,
2998 include_sequence: bool,
2999 sequence_column_name: String,
3000 include_row_index: bool,
3001 row_index_column_name: String,
3002 max_rows: Option<usize>,
3003 ) -> PyResult<Self> {
3004 let stream_schema = StreamSchema::new()
3005 .with_key(include_key)
3006 .with_key_column_name(&key_column_name)
3007 .with_id(include_id)
3008 .with_id_column_name(&id_column_name)
3009 .with_timestamp(include_timestamp)
3010 .with_timestamp_column_name(×tamp_column_name)
3011 .with_sequence(include_sequence)
3012 .with_sequence_column_name(&sequence_column_name)
3013 .with_row_index(include_row_index)
3014 .with_row_index_column_name(&row_index_column_name)
3015 .set_fields(fields);
3016
3017 let mut config = types::hash::BatchConfig::new(pattern)
3018 .with_batch_size(batch_size)
3019 .with_count_hint(count_hint);
3020
3021 if let Some(max) = max_rows {
3022 config = config.with_max_rows(max);
3023 }
3024
3025 let mut inner = StreamBatchIterator::new(&url, stream_schema, config)
3026 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
3027
3028 inner = inner.with_start_id(&start_id).with_end_id(&end_id);
3029
3030 if let Some(count) = count_per_stream {
3031 inner = inner.with_count_per_stream(count);
3032 }
3033
3034 Ok(Self { inner })
3035 }
3036
3037 fn next_batch_ipc(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
3041 let batch = self
3042 .inner
3043 .next_batch()
3044 .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
3045
3046 match batch {
3047 Some(record_batch) => {
3048 let mut buf = Vec::new();
3049 {
3050 let mut writer = arrow::ipc::writer::FileWriter::try_new(
3051 &mut buf,
3052 record_batch.schema().as_ref(),
3053 )
3054 .map_err(|e| {
3055 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
3056 "Failed to create IPC writer: {}",
3057 e
3058 ))
3059 })?;
3060
3061 writer.write(&record_batch).map_err(|e| {
3062 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
3063 "Failed to write batch: {}",
3064 e
3065 ))
3066 })?;
3067
3068 writer.finish().map_err(|e| {
3069 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
3070 "Failed to finish IPC: {}",
3071 e
3072 ))
3073 })?;
3074 }
3075
3076 Ok(Some(pyo3::types::PyBytes::new(py, &buf).into()))
3077 }
3078 None => Ok(None),
3079 }
3080 }
3081
3082 fn is_done(&self) -> bool {
3084 self.inner.is_done()
3085 }
3086
3087 fn rows_yielded(&self) -> usize {
3089 self.inner.rows_yielded()
3090 }
3091}