polars_redis/
options.rs

1//! Configuration options for Redis operations.
2//!
3//! This module provides unified configuration structs following the polars-io pattern:
4//! - Builder pattern with `with_*` methods
5//! - Sensible defaults via `Default` trait
6//! - Environment variable overrides for common settings
7//!
8//! # Example
9//!
10//! ```ignore
11//! use polars_redis::options::{ScanOptions, KeyColumn, TtlColumn};
12//!
13//! let scan = ScanOptions::new("user:*")
14//!     .with_batch_size(500)
15//!     .with_n_rows(10000);
16//!
17//! let key = KeyColumn::enabled().with_name("redis_key");
18//! let ttl = TtlColumn::enabled();
19//! ```
20
21use std::sync::LazyLock;
22
23// ============================================================================
24// Parallel processing configuration
25// ============================================================================
26
27/// Strategy for parallel processing of Redis operations.
28///
29/// Controls how batch fetching is parallelized to improve throughput
30/// on large datasets.
31///
32/// # Example
33///
34/// ```ignore
35/// use polars_redis::options::{ScanOptions, ParallelStrategy};
36///
37/// // Use 4 parallel workers for batch fetching
38/// let opts = ScanOptions::new("user:*")
39///     .with_parallel(ParallelStrategy::Batches(4));
40///
41/// // Let the library choose based on dataset size
42/// let opts = ScanOptions::new("user:*")
43///     .with_parallel(ParallelStrategy::Auto);
44/// ```
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
46pub enum ParallelStrategy {
47    /// Sequential processing (default, current behavior).
48    ///
49    /// Keys are scanned and fetched one batch at a time.
50    /// Best for small datasets or when ordering matters.
51    #[default]
52    None,
53
54    /// Parallel batch fetching with N workers.
55    ///
56    /// A single SCAN operation feeds keys to N parallel fetch workers.
57    /// Each worker fetches data for a subset of keys concurrently.
58    ///
59    /// Recommended values: 2-8 workers depending on Redis server capacity.
60    Batches(usize),
61
62    /// Automatically select strategy based on hints.
63    ///
64    /// - Uses `None` for small datasets (< 1000 keys)
65    /// - Uses `Batches(4)` for larger datasets
66    Auto,
67}
68
69impl ParallelStrategy {
70    /// Create a parallel strategy with the given number of workers.
71    pub fn batches(n: usize) -> Self {
72        ParallelStrategy::Batches(n.max(1))
73    }
74
75    /// Check if this strategy enables parallel processing.
76    pub fn is_parallel(&self) -> bool {
77        !matches!(self, ParallelStrategy::None)
78    }
79
80    /// Get the number of workers for this strategy.
81    ///
82    /// Returns 1 for `None`, the specified count for `Batches`,
83    /// and a default of 4 for `Auto`.
84    pub fn worker_count(&self) -> usize {
85        match self {
86            ParallelStrategy::None => 1,
87            ParallelStrategy::Batches(n) => *n,
88            ParallelStrategy::Auto => 4, // Default for auto
89        }
90    }
91
92    /// Resolve `Auto` strategy based on estimated key count.
93    pub fn resolve(&self, estimated_keys: Option<usize>) -> ParallelStrategy {
94        match self {
95            ParallelStrategy::Auto => match estimated_keys {
96                Some(n) if n < 1000 => ParallelStrategy::None,
97                _ => ParallelStrategy::Batches(4),
98            },
99            other => *other,
100        }
101    }
102}
103
104/// Row index configuration, matching polars-io pattern.
105#[derive(Debug, Clone, PartialEq, Eq, Hash)]
106pub struct RowIndex {
107    /// Column name for the row index.
108    pub name: String,
109    /// Starting offset for the index.
110    pub offset: u64,
111}
112
113impl Default for RowIndex {
114    fn default() -> Self {
115        Self {
116            name: "_index".to_string(),
117            offset: 0,
118        }
119    }
120}
121
122impl RowIndex {
123    /// Create a new RowIndex with the given name.
124    pub fn new(name: impl Into<String>) -> Self {
125        Self {
126            name: name.into(),
127            offset: 0,
128        }
129    }
130
131    /// Set the starting offset.
132    pub fn with_offset(mut self, offset: u64) -> Self {
133        self.offset = offset;
134        self
135    }
136}
137
138/// Common scan options shared across all Redis data types.
139#[derive(Debug, Clone)]
140pub struct ScanOptions {
141    /// Key pattern to match (e.g., "user:*").
142    pub pattern: String,
143    /// Number of keys to process per batch.
144    pub batch_size: usize,
145    /// SCAN COUNT hint for Redis.
146    pub count_hint: usize,
147    /// Maximum total rows to return (None for unlimited).
148    pub n_rows: Option<usize>,
149    /// Parallel processing strategy.
150    pub parallel: ParallelStrategy,
151}
152
153impl Default for ScanOptions {
154    fn default() -> Self {
155        Self {
156            pattern: "*".to_string(),
157            batch_size: get_default_batch_size(),
158            count_hint: get_default_count_hint(),
159            n_rows: None,
160            parallel: ParallelStrategy::None,
161        }
162    }
163}
164
165impl ScanOptions {
166    /// Create new ScanOptions with the given pattern.
167    pub fn new(pattern: impl Into<String>) -> Self {
168        Self {
169            pattern: pattern.into(),
170            ..Default::default()
171        }
172    }
173
174    /// Set the batch size.
175    pub fn with_batch_size(mut self, size: usize) -> Self {
176        self.batch_size = size;
177        self
178    }
179
180    /// Set the COUNT hint for SCAN.
181    pub fn with_count_hint(mut self, count: usize) -> Self {
182        self.count_hint = count;
183        self
184    }
185
186    /// Set the maximum number of rows to return.
187    pub fn with_n_rows(mut self, n: usize) -> Self {
188        self.n_rows = Some(n);
189        self
190    }
191
192    /// Set the parallel processing strategy.
193    pub fn with_parallel(mut self, strategy: ParallelStrategy) -> Self {
194        self.parallel = strategy;
195        self
196    }
197}
198
199/// Key column configuration.
200#[derive(Debug, Clone)]
201pub struct KeyColumn {
202    /// Whether to include the Redis key as a column.
203    pub enabled: bool,
204    /// Column name for the key.
205    pub name: String,
206}
207
208impl Default for KeyColumn {
209    fn default() -> Self {
210        Self {
211            enabled: true,
212            name: "_key".to_string(),
213        }
214    }
215}
216
217impl KeyColumn {
218    /// Create enabled key column with default name.
219    pub fn enabled() -> Self {
220        Self::default()
221    }
222
223    /// Create disabled key column.
224    pub fn disabled() -> Self {
225        Self {
226            enabled: false,
227            name: "_key".to_string(),
228        }
229    }
230
231    /// Set the column name.
232    pub fn with_name(mut self, name: impl Into<String>) -> Self {
233        self.name = name.into();
234        self
235    }
236}
237
238/// TTL column configuration.
239#[derive(Debug, Clone)]
240pub struct TtlColumn {
241    /// Whether to include the TTL as a column.
242    pub enabled: bool,
243    /// Column name for the TTL.
244    pub name: String,
245}
246
247impl Default for TtlColumn {
248    fn default() -> Self {
249        Self {
250            enabled: false,
251            name: "_ttl".to_string(),
252        }
253    }
254}
255
256impl TtlColumn {
257    /// Create enabled TTL column with default name.
258    pub fn enabled() -> Self {
259        Self {
260            enabled: true,
261            name: "_ttl".to_string(),
262        }
263    }
264
265    /// Create disabled TTL column.
266    pub fn disabled() -> Self {
267        Self::default()
268    }
269
270    /// Set the column name.
271    pub fn with_name(mut self, name: impl Into<String>) -> Self {
272        self.name = name.into();
273        self
274    }
275}
276
277/// Row index column configuration.
278#[derive(Debug, Clone)]
279pub struct RowIndexColumn {
280    /// Whether to include the row index as a column.
281    pub enabled: bool,
282    /// Column name for the row index.
283    pub name: String,
284    /// Starting offset for the index.
285    pub offset: u64,
286}
287
288impl Default for RowIndexColumn {
289    fn default() -> Self {
290        Self {
291            enabled: false,
292            name: "_index".to_string(),
293            offset: 0,
294        }
295    }
296}
297
298impl RowIndexColumn {
299    /// Create enabled row index column with default name.
300    pub fn enabled() -> Self {
301        Self {
302            enabled: true,
303            name: "_index".to_string(),
304            offset: 0,
305        }
306    }
307
308    /// Create disabled row index column.
309    pub fn disabled() -> Self {
310        Self::default()
311    }
312
313    /// Set the column name.
314    pub fn with_name(mut self, name: impl Into<String>) -> Self {
315        self.name = name.into();
316        self
317    }
318
319    /// Set the starting offset.
320    pub fn with_offset(mut self, offset: u64) -> Self {
321        self.offset = offset;
322        self
323    }
324
325    /// Convert to RowIndex if enabled.
326    pub fn to_row_index(&self) -> Option<RowIndex> {
327        if self.enabled {
328            Some(RowIndex {
329                name: self.name.clone(),
330                offset: self.offset,
331            })
332        } else {
333            None
334        }
335    }
336}
337
338// ============================================================================
339// Type-specific scan options (following polars-io pattern)
340// ============================================================================
341
342/// Options for scanning Redis hashes.
343///
344/// # Example
345///
346/// ```ignore
347/// use polars_redis::options::HashScanOptions;
348///
349/// let opts = HashScanOptions::new("user:*")
350///     .with_batch_size(500)
351///     .with_ttl(true)
352///     .with_projection(vec!["name", "email"]);
353/// ```
354#[derive(Debug, Clone)]
355pub struct HashScanOptions {
356    /// Base scan options.
357    pub scan: ScanOptions,
358    /// Whether to include the key column.
359    pub include_key: bool,
360    /// Custom name for the key column.
361    pub key_column_name: Option<String>,
362    /// Whether to include TTL.
363    pub include_ttl: bool,
364    /// Custom name for the TTL column.
365    pub ttl_column_name: Option<String>,
366    /// Row index configuration.
367    pub row_index: Option<RowIndex>,
368    /// Fields to fetch (None = all via HGETALL).
369    pub projection: Option<Vec<String>>,
370}
371
372impl Default for HashScanOptions {
373    fn default() -> Self {
374        Self {
375            scan: ScanOptions::default(),
376            include_key: true,
377            key_column_name: None,
378            include_ttl: false,
379            ttl_column_name: None,
380            row_index: None,
381            projection: None,
382        }
383    }
384}
385
386impl HashScanOptions {
387    /// Create new HashScanOptions with the given pattern.
388    pub fn new(pattern: impl Into<String>) -> Self {
389        Self {
390            scan: ScanOptions::new(pattern),
391            ..Default::default()
392        }
393    }
394
395    /// Set the batch size.
396    pub fn with_batch_size(mut self, size: usize) -> Self {
397        self.scan.batch_size = size;
398        self
399    }
400
401    /// Set the COUNT hint for SCAN.
402    pub fn with_count_hint(mut self, count: usize) -> Self {
403        self.scan.count_hint = count;
404        self
405    }
406
407    /// Set the maximum number of rows to return.
408    pub fn with_n_rows(mut self, n: usize) -> Self {
409        self.scan.n_rows = Some(n);
410        self
411    }
412
413    /// Enable or disable the key column.
414    pub fn with_key(mut self, include: bool) -> Self {
415        self.include_key = include;
416        self
417    }
418
419    /// Set a custom name for the key column.
420    pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
421        self.key_column_name = Some(name.into());
422        self
423    }
424
425    /// Enable or disable the TTL column.
426    pub fn with_ttl(mut self, include: bool) -> Self {
427        self.include_ttl = include;
428        self
429    }
430
431    /// Set a custom name for the TTL column.
432    pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
433        self.ttl_column_name = Some(name.into());
434        self
435    }
436
437    /// Set the row index configuration.
438    pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
439        self.row_index = Some(RowIndex {
440            name: name.into(),
441            offset,
442        });
443        self
444    }
445
446    /// Set the fields to fetch (projection).
447    pub fn with_projection(mut self, fields: Vec<impl Into<String>>) -> Self {
448        self.projection = Some(fields.into_iter().map(Into::into).collect());
449        self
450    }
451}
452
453/// Options for scanning Redis JSON documents.
454///
455/// # Example
456///
457/// ```ignore
458/// use polars_redis::options::JsonScanOptions;
459///
460/// let opts = JsonScanOptions::new("doc:*")
461///     .with_batch_size(500)
462///     .with_path("$.user");
463/// ```
464#[derive(Debug, Clone)]
465pub struct JsonScanOptions {
466    /// Base scan options.
467    pub scan: ScanOptions,
468    /// Whether to include the key column.
469    pub include_key: bool,
470    /// Custom name for the key column.
471    pub key_column_name: Option<String>,
472    /// Whether to include TTL.
473    pub include_ttl: bool,
474    /// Custom name for the TTL column.
475    pub ttl_column_name: Option<String>,
476    /// Row index configuration.
477    pub row_index: Option<RowIndex>,
478    /// JSON path to extract (None = root "$").
479    pub path: Option<String>,
480    /// Fields to fetch from the JSON document.
481    pub projection: Option<Vec<String>>,
482}
483
484impl Default for JsonScanOptions {
485    fn default() -> Self {
486        Self {
487            scan: ScanOptions::default(),
488            include_key: true,
489            key_column_name: None,
490            include_ttl: false,
491            ttl_column_name: None,
492            row_index: None,
493            path: None,
494            projection: None,
495        }
496    }
497}
498
499impl JsonScanOptions {
500    /// Create new JsonScanOptions with the given pattern.
501    pub fn new(pattern: impl Into<String>) -> Self {
502        Self {
503            scan: ScanOptions::new(pattern),
504            ..Default::default()
505        }
506    }
507
508    /// Set the batch size.
509    pub fn with_batch_size(mut self, size: usize) -> Self {
510        self.scan.batch_size = size;
511        self
512    }
513
514    /// Set the COUNT hint for SCAN.
515    pub fn with_count_hint(mut self, count: usize) -> Self {
516        self.scan.count_hint = count;
517        self
518    }
519
520    /// Set the maximum number of rows to return.
521    pub fn with_n_rows(mut self, n: usize) -> Self {
522        self.scan.n_rows = Some(n);
523        self
524    }
525
526    /// Enable or disable the key column.
527    pub fn with_key(mut self, include: bool) -> Self {
528        self.include_key = include;
529        self
530    }
531
532    /// Set a custom name for the key column.
533    pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
534        self.key_column_name = Some(name.into());
535        self
536    }
537
538    /// Enable or disable the TTL column.
539    pub fn with_ttl(mut self, include: bool) -> Self {
540        self.include_ttl = include;
541        self
542    }
543
544    /// Set a custom name for the TTL column.
545    pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
546        self.ttl_column_name = Some(name.into());
547        self
548    }
549
550    /// Set the row index configuration.
551    pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
552        self.row_index = Some(RowIndex {
553            name: name.into(),
554            offset,
555        });
556        self
557    }
558
559    /// Set the JSON path to extract.
560    pub fn with_path(mut self, path: impl Into<String>) -> Self {
561        self.path = Some(path.into());
562        self
563    }
564
565    /// Set the fields to fetch (projection).
566    pub fn with_projection(mut self, fields: Vec<impl Into<String>>) -> Self {
567        self.projection = Some(fields.into_iter().map(Into::into).collect());
568        self
569    }
570}
571
572/// Options for scanning Redis strings.
573///
574/// # Example
575///
576/// ```ignore
577/// use polars_redis::options::StringScanOptions;
578///
579/// let opts = StringScanOptions::new("counter:*")
580///     .with_batch_size(1000)
581///     .with_value_column_name("count");
582/// ```
583#[derive(Debug, Clone)]
584pub struct StringScanOptions {
585    /// Base scan options.
586    pub scan: ScanOptions,
587    /// Whether to include the key column.
588    pub include_key: bool,
589    /// Custom name for the key column.
590    pub key_column_name: Option<String>,
591    /// Custom name for the value column.
592    pub value_column_name: Option<String>,
593    /// Row index configuration.
594    pub row_index: Option<RowIndex>,
595}
596
597impl Default for StringScanOptions {
598    fn default() -> Self {
599        Self {
600            scan: ScanOptions::default(),
601            include_key: true,
602            key_column_name: None,
603            value_column_name: None,
604            row_index: None,
605        }
606    }
607}
608
609impl StringScanOptions {
610    /// Create new StringScanOptions with the given pattern.
611    pub fn new(pattern: impl Into<String>) -> Self {
612        Self {
613            scan: ScanOptions::new(pattern),
614            ..Default::default()
615        }
616    }
617
618    /// Set the batch size.
619    pub fn with_batch_size(mut self, size: usize) -> Self {
620        self.scan.batch_size = size;
621        self
622    }
623
624    /// Set the COUNT hint for SCAN.
625    pub fn with_count_hint(mut self, count: usize) -> Self {
626        self.scan.count_hint = count;
627        self
628    }
629
630    /// Set the maximum number of rows to return.
631    pub fn with_n_rows(mut self, n: usize) -> Self {
632        self.scan.n_rows = Some(n);
633        self
634    }
635
636    /// Enable or disable the key column.
637    pub fn with_key(mut self, include: bool) -> Self {
638        self.include_key = include;
639        self
640    }
641
642    /// Set a custom name for the key column.
643    pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
644        self.key_column_name = Some(name.into());
645        self
646    }
647
648    /// Set a custom name for the value column.
649    pub fn with_value_column_name(mut self, name: impl Into<String>) -> Self {
650        self.value_column_name = Some(name.into());
651        self
652    }
653
654    /// Set the row index configuration.
655    pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
656        self.row_index = Some(RowIndex {
657            name: name.into(),
658            offset,
659        });
660        self
661    }
662}
663
664/// Options for scanning Redis streams.
665///
666/// # Example
667///
668/// ```ignore
669/// use polars_redis::options::StreamScanOptions;
670///
671/// let opts = StreamScanOptions::new("events:*")
672///     .with_start_id("-")
673///     .with_end_id("+")
674///     .with_count_per_stream(1000);
675/// ```
676#[derive(Debug, Clone)]
677pub struct StreamScanOptions {
678    /// Base scan options.
679    pub scan: ScanOptions,
680    /// Whether to include the key column.
681    pub include_key: bool,
682    /// Custom name for the key column.
683    pub key_column_name: Option<String>,
684    /// Whether to include the entry ID.
685    pub include_id: bool,
686    /// Whether to include timestamp.
687    pub include_timestamp: bool,
688    /// Whether to include sequence number.
689    pub include_sequence: bool,
690    /// Row index configuration.
691    pub row_index: Option<RowIndex>,
692    /// Start ID for XRANGE (default: "-").
693    pub start_id: String,
694    /// End ID for XRANGE (default: "+").
695    pub end_id: String,
696    /// Maximum entries per stream.
697    pub count_per_stream: Option<usize>,
698    /// Fields to extract from stream entries.
699    pub fields: Option<Vec<String>>,
700}
701
702impl Default for StreamScanOptions {
703    fn default() -> Self {
704        Self {
705            scan: ScanOptions::default(),
706            include_key: true,
707            key_column_name: None,
708            include_id: true,
709            include_timestamp: true,
710            include_sequence: false,
711            row_index: None,
712            start_id: "-".to_string(),
713            end_id: "+".to_string(),
714            count_per_stream: None,
715            fields: None,
716        }
717    }
718}
719
720impl StreamScanOptions {
721    /// Create new StreamScanOptions with the given pattern.
722    pub fn new(pattern: impl Into<String>) -> Self {
723        Self {
724            scan: ScanOptions::new(pattern),
725            ..Default::default()
726        }
727    }
728
729    /// Set the batch size.
730    pub fn with_batch_size(mut self, size: usize) -> Self {
731        self.scan.batch_size = size;
732        self
733    }
734
735    /// Set the COUNT hint for SCAN.
736    pub fn with_count_hint(mut self, count: usize) -> Self {
737        self.scan.count_hint = count;
738        self
739    }
740
741    /// Set the maximum number of rows to return.
742    pub fn with_n_rows(mut self, n: usize) -> Self {
743        self.scan.n_rows = Some(n);
744        self
745    }
746
747    /// Enable or disable the key column.
748    pub fn with_key(mut self, include: bool) -> Self {
749        self.include_key = include;
750        self
751    }
752
753    /// Enable or disable the entry ID column.
754    pub fn with_id(mut self, include: bool) -> Self {
755        self.include_id = include;
756        self
757    }
758
759    /// Enable or disable the timestamp column.
760    pub fn with_timestamp(mut self, include: bool) -> Self {
761        self.include_timestamp = include;
762        self
763    }
764
765    /// Enable or disable the sequence column.
766    pub fn with_sequence(mut self, include: bool) -> Self {
767        self.include_sequence = include;
768        self
769    }
770
771    /// Set the row index configuration.
772    pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
773        self.row_index = Some(RowIndex {
774            name: name.into(),
775            offset,
776        });
777        self
778    }
779
780    /// Set the start ID for XRANGE.
781    pub fn with_start_id(mut self, id: impl Into<String>) -> Self {
782        self.start_id = id.into();
783        self
784    }
785
786    /// Set the end ID for XRANGE.
787    pub fn with_end_id(mut self, id: impl Into<String>) -> Self {
788        self.end_id = id.into();
789        self
790    }
791
792    /// Set the maximum entries to fetch per stream.
793    pub fn with_count_per_stream(mut self, count: usize) -> Self {
794        self.count_per_stream = Some(count);
795        self
796    }
797
798    /// Set the fields to extract from stream entries.
799    pub fn with_fields(mut self, fields: Vec<impl Into<String>>) -> Self {
800        self.fields = Some(fields.into_iter().map(Into::into).collect());
801        self
802    }
803}
804
805/// Options for scanning Redis time series.
806///
807/// # Example
808///
809/// ```ignore
810/// use polars_redis::options::TimeSeriesScanOptions;
811///
812/// let opts = TimeSeriesScanOptions::new("sensor:*")
813///     .with_start("1000")
814///     .with_end("2000")
815///     .with_aggregation("avg", 60000);
816/// ```
817#[derive(Debug, Clone)]
818pub struct TimeSeriesScanOptions {
819    /// Base scan options.
820    pub scan: ScanOptions,
821    /// Whether to include the key column.
822    pub include_key: bool,
823    /// Custom name for the key column.
824    pub key_column_name: Option<String>,
825    /// Custom name for the timestamp column.
826    pub timestamp_column_name: Option<String>,
827    /// Custom name for the value column.
828    pub value_column_name: Option<String>,
829    /// Row index configuration.
830    pub row_index: Option<RowIndex>,
831    /// Start timestamp for TS.RANGE (default: "-").
832    pub start: String,
833    /// End timestamp for TS.RANGE (default: "+").
834    pub end: String,
835    /// Maximum samples per time series.
836    pub count_per_series: Option<usize>,
837    /// Aggregation type (avg, sum, min, max, etc.).
838    pub aggregation: Option<String>,
839    /// Bucket size in milliseconds for aggregation.
840    pub bucket_size_ms: Option<i64>,
841}
842
843impl Default for TimeSeriesScanOptions {
844    fn default() -> Self {
845        Self {
846            scan: ScanOptions::default(),
847            include_key: true,
848            key_column_name: None,
849            timestamp_column_name: None,
850            value_column_name: None,
851            row_index: None,
852            start: "-".to_string(),
853            end: "+".to_string(),
854            count_per_series: None,
855            aggregation: None,
856            bucket_size_ms: None,
857        }
858    }
859}
860
861impl TimeSeriesScanOptions {
862    /// Create new TimeSeriesScanOptions with the given pattern.
863    pub fn new(pattern: impl Into<String>) -> Self {
864        Self {
865            scan: ScanOptions::new(pattern),
866            ..Default::default()
867        }
868    }
869
870    /// Set the batch size.
871    pub fn with_batch_size(mut self, size: usize) -> Self {
872        self.scan.batch_size = size;
873        self
874    }
875
876    /// Set the COUNT hint for SCAN.
877    pub fn with_count_hint(mut self, count: usize) -> Self {
878        self.scan.count_hint = count;
879        self
880    }
881
882    /// Set the maximum number of rows to return.
883    pub fn with_n_rows(mut self, n: usize) -> Self {
884        self.scan.n_rows = Some(n);
885        self
886    }
887
888    /// Enable or disable the key column.
889    pub fn with_key(mut self, include: bool) -> Self {
890        self.include_key = include;
891        self
892    }
893
894    /// Set a custom name for the key column.
895    pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
896        self.key_column_name = Some(name.into());
897        self
898    }
899
900    /// Set a custom name for the timestamp column.
901    pub fn with_timestamp_column_name(mut self, name: impl Into<String>) -> Self {
902        self.timestamp_column_name = Some(name.into());
903        self
904    }
905
906    /// Set a custom name for the value column.
907    pub fn with_value_column_name(mut self, name: impl Into<String>) -> Self {
908        self.value_column_name = Some(name.into());
909        self
910    }
911
912    /// Set the row index configuration.
913    pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
914        self.row_index = Some(RowIndex {
915            name: name.into(),
916            offset,
917        });
918        self
919    }
920
921    /// Set the start timestamp for TS.RANGE.
922    pub fn with_start(mut self, start: impl Into<String>) -> Self {
923        self.start = start.into();
924        self
925    }
926
927    /// Set the end timestamp for TS.RANGE.
928    pub fn with_end(mut self, end: impl Into<String>) -> Self {
929        self.end = end.into();
930        self
931    }
932
933    /// Set the maximum samples to fetch per time series.
934    pub fn with_count_per_series(mut self, count: usize) -> Self {
935        self.count_per_series = Some(count);
936        self
937    }
938
939    /// Set aggregation type and bucket size.
940    pub fn with_aggregation(mut self, agg_type: impl Into<String>, bucket_size_ms: i64) -> Self {
941        self.aggregation = Some(agg_type.into());
942        self.bucket_size_ms = Some(bucket_size_ms);
943        self
944    }
945}
946
947// ============================================================================
948// Environment variable configuration
949// ============================================================================
950
951/// Default batch size from environment or fallback.
952static DEFAULT_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
953    std::env::var("POLARS_REDIS_BATCH_SIZE")
954        .ok()
955        .and_then(|v| v.parse().ok())
956        .unwrap_or(1000)
957});
958
959/// Default count hint from environment or fallback.
960static DEFAULT_COUNT_HINT: LazyLock<usize> = LazyLock::new(|| {
961    std::env::var("POLARS_REDIS_COUNT_HINT")
962        .ok()
963        .and_then(|v| v.parse().ok())
964        .unwrap_or(100)
965});
966
967/// Default connection timeout in milliseconds.
968static DEFAULT_TIMEOUT_MS: LazyLock<u64> = LazyLock::new(|| {
969    std::env::var("POLARS_REDIS_TIMEOUT_MS")
970        .ok()
971        .and_then(|v| v.parse().ok())
972        .unwrap_or(30000)
973});
974
975/// Get the default batch size.
976pub fn get_default_batch_size() -> usize {
977    *DEFAULT_BATCH_SIZE
978}
979
980/// Get the default count hint.
981pub fn get_default_count_hint() -> usize {
982    *DEFAULT_COUNT_HINT
983}
984
985/// Get the default timeout in milliseconds.
986pub fn get_default_timeout_ms() -> u64 {
987    *DEFAULT_TIMEOUT_MS
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    #[test]
995    fn test_parallel_strategy_default() {
996        let strategy = ParallelStrategy::default();
997        assert_eq!(strategy, ParallelStrategy::None);
998        assert!(!strategy.is_parallel());
999        assert_eq!(strategy.worker_count(), 1);
1000    }
1001
1002    #[test]
1003    fn test_parallel_strategy_batches() {
1004        let strategy = ParallelStrategy::batches(4);
1005        assert_eq!(strategy, ParallelStrategy::Batches(4));
1006        assert!(strategy.is_parallel());
1007        assert_eq!(strategy.worker_count(), 4);
1008
1009        // Ensure at least 1 worker
1010        let min_strategy = ParallelStrategy::batches(0);
1011        assert_eq!(min_strategy.worker_count(), 1);
1012    }
1013
1014    #[test]
1015    fn test_parallel_strategy_auto() {
1016        let strategy = ParallelStrategy::Auto;
1017        assert!(strategy.is_parallel());
1018        assert_eq!(strategy.worker_count(), 4); // Default for auto
1019
1020        // Resolve based on key count
1021        assert_eq!(strategy.resolve(Some(500)), ParallelStrategy::None);
1022        assert_eq!(strategy.resolve(Some(5000)), ParallelStrategy::Batches(4));
1023        assert_eq!(strategy.resolve(None), ParallelStrategy::Batches(4));
1024    }
1025
1026    #[test]
1027    fn test_scan_options_default() {
1028        let opts = ScanOptions::default();
1029        assert_eq!(opts.pattern, "*");
1030        assert_eq!(opts.batch_size, get_default_batch_size());
1031        assert_eq!(opts.count_hint, get_default_count_hint());
1032        assert!(opts.n_rows.is_none());
1033        assert_eq!(opts.parallel, ParallelStrategy::None);
1034    }
1035
1036    #[test]
1037    fn test_scan_options_builder() {
1038        let opts = ScanOptions::new("user:*")
1039            .with_batch_size(500)
1040            .with_count_hint(50)
1041            .with_n_rows(1000)
1042            .with_parallel(ParallelStrategy::Batches(4));
1043
1044        assert_eq!(opts.pattern, "user:*");
1045        assert_eq!(opts.batch_size, 500);
1046        assert_eq!(opts.count_hint, 50);
1047        assert_eq!(opts.n_rows, Some(1000));
1048        assert_eq!(opts.parallel, ParallelStrategy::Batches(4));
1049    }
1050
1051    #[test]
1052    fn test_key_column() {
1053        let enabled = KeyColumn::enabled().with_name("redis_key");
1054        assert!(enabled.enabled);
1055        assert_eq!(enabled.name, "redis_key");
1056
1057        let disabled = KeyColumn::disabled();
1058        assert!(!disabled.enabled);
1059    }
1060
1061    #[test]
1062    fn test_ttl_column() {
1063        let enabled = TtlColumn::enabled().with_name("expiry");
1064        assert!(enabled.enabled);
1065        assert_eq!(enabled.name, "expiry");
1066
1067        let disabled = TtlColumn::disabled();
1068        assert!(!disabled.enabled);
1069    }
1070
1071    #[test]
1072    fn test_row_index_column() {
1073        let col = RowIndexColumn::enabled()
1074            .with_name("row_num")
1075            .with_offset(100);
1076
1077        assert!(col.enabled);
1078        assert_eq!(col.name, "row_num");
1079        assert_eq!(col.offset, 100);
1080
1081        let row_index = col.to_row_index().unwrap();
1082        assert_eq!(row_index.name, "row_num");
1083        assert_eq!(row_index.offset, 100);
1084    }
1085
1086    #[test]
1087    fn test_row_index() {
1088        let idx = RowIndex::new("idx").with_offset(50);
1089        assert_eq!(idx.name, "idx");
1090        assert_eq!(idx.offset, 50);
1091    }
1092
1093    #[test]
1094    fn test_hash_scan_options() {
1095        let opts = HashScanOptions::new("user:*")
1096            .with_batch_size(500)
1097            .with_count_hint(50)
1098            .with_n_rows(1000)
1099            .with_key(true)
1100            .with_key_column_name("redis_key")
1101            .with_ttl(true)
1102            .with_ttl_column_name("expiry")
1103            .with_row_index("idx", 10)
1104            .with_projection(vec!["name", "email"]);
1105
1106        assert_eq!(opts.scan.pattern, "user:*");
1107        assert_eq!(opts.scan.batch_size, 500);
1108        assert_eq!(opts.scan.count_hint, 50);
1109        assert_eq!(opts.scan.n_rows, Some(1000));
1110        assert!(opts.include_key);
1111        assert_eq!(opts.key_column_name, Some("redis_key".to_string()));
1112        assert!(opts.include_ttl);
1113        assert_eq!(opts.ttl_column_name, Some("expiry".to_string()));
1114        assert_eq!(
1115            opts.row_index.as_ref().map(|r| &r.name),
1116            Some(&"idx".to_string())
1117        );
1118        assert_eq!(opts.row_index.as_ref().map(|r| r.offset), Some(10));
1119        assert_eq!(
1120            opts.projection,
1121            Some(vec!["name".to_string(), "email".to_string()])
1122        );
1123    }
1124
1125    #[test]
1126    fn test_json_scan_options() {
1127        let opts = JsonScanOptions::new("doc:*")
1128            .with_batch_size(250)
1129            .with_path("$.user")
1130            .with_projection(vec!["name", "age"]);
1131
1132        assert_eq!(opts.scan.pattern, "doc:*");
1133        assert_eq!(opts.scan.batch_size, 250);
1134        assert_eq!(opts.path, Some("$.user".to_string()));
1135        assert_eq!(
1136            opts.projection,
1137            Some(vec!["name".to_string(), "age".to_string()])
1138        );
1139    }
1140
1141    #[test]
1142    fn test_string_scan_options() {
1143        let opts = StringScanOptions::new("counter:*")
1144            .with_batch_size(1000)
1145            .with_value_column_name("count")
1146            .with_key(false);
1147
1148        assert_eq!(opts.scan.pattern, "counter:*");
1149        assert_eq!(opts.scan.batch_size, 1000);
1150        assert_eq!(opts.value_column_name, Some("count".to_string()));
1151        assert!(!opts.include_key);
1152    }
1153
1154    #[test]
1155    fn test_stream_scan_options() {
1156        let opts = StreamScanOptions::new("events:*")
1157            .with_start_id("1000-0")
1158            .with_end_id("2000-0")
1159            .with_count_per_stream(100)
1160            .with_id(true)
1161            .with_timestamp(true)
1162            .with_sequence(true)
1163            .with_fields(vec!["action", "user"]);
1164
1165        assert_eq!(opts.scan.pattern, "events:*");
1166        assert_eq!(opts.start_id, "1000-0");
1167        assert_eq!(opts.end_id, "2000-0");
1168        assert_eq!(opts.count_per_stream, Some(100));
1169        assert!(opts.include_id);
1170        assert!(opts.include_timestamp);
1171        assert!(opts.include_sequence);
1172        assert_eq!(
1173            opts.fields,
1174            Some(vec!["action".to_string(), "user".to_string()])
1175        );
1176    }
1177
1178    #[test]
1179    fn test_timeseries_scan_options() {
1180        let opts = TimeSeriesScanOptions::new("sensor:*")
1181            .with_start("1000")
1182            .with_end("2000")
1183            .with_count_per_series(500)
1184            .with_aggregation("avg", 60000)
1185            .with_value_column_name("temperature")
1186            .with_timestamp_column_name("ts");
1187
1188        assert_eq!(opts.scan.pattern, "sensor:*");
1189        assert_eq!(opts.start, "1000");
1190        assert_eq!(opts.end, "2000");
1191        assert_eq!(opts.count_per_series, Some(500));
1192        assert_eq!(opts.aggregation, Some("avg".to_string()));
1193        assert_eq!(opts.bucket_size_ms, Some(60000));
1194        assert_eq!(opts.value_column_name, Some("temperature".to_string()));
1195        assert_eq!(opts.timestamp_column_name, Some("ts".to_string()));
1196    }
1197}