1use std::sync::LazyLock;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
46pub enum ParallelStrategy {
47 #[default]
52 None,
53
54 Batches(usize),
61
62 Auto,
67}
68
69impl ParallelStrategy {
70 pub fn batches(n: usize) -> Self {
72 ParallelStrategy::Batches(n.max(1))
73 }
74
75 pub fn is_parallel(&self) -> bool {
77 !matches!(self, ParallelStrategy::None)
78 }
79
80 pub fn worker_count(&self) -> usize {
85 match self {
86 ParallelStrategy::None => 1,
87 ParallelStrategy::Batches(n) => *n,
88 ParallelStrategy::Auto => 4, }
90 }
91
92 pub fn resolve(&self, estimated_keys: Option<usize>) -> ParallelStrategy {
94 match self {
95 ParallelStrategy::Auto => match estimated_keys {
96 Some(n) if n < 1000 => ParallelStrategy::None,
97 _ => ParallelStrategy::Batches(4),
98 },
99 other => *other,
100 }
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash)]
106pub struct RowIndex {
107 pub name: String,
109 pub offset: u64,
111}
112
113impl Default for RowIndex {
114 fn default() -> Self {
115 Self {
116 name: "_index".to_string(),
117 offset: 0,
118 }
119 }
120}
121
122impl RowIndex {
123 pub fn new(name: impl Into<String>) -> Self {
125 Self {
126 name: name.into(),
127 offset: 0,
128 }
129 }
130
131 pub fn with_offset(mut self, offset: u64) -> Self {
133 self.offset = offset;
134 self
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct ScanOptions {
141 pub pattern: String,
143 pub batch_size: usize,
145 pub count_hint: usize,
147 pub n_rows: Option<usize>,
149 pub parallel: ParallelStrategy,
151}
152
153impl Default for ScanOptions {
154 fn default() -> Self {
155 Self {
156 pattern: "*".to_string(),
157 batch_size: get_default_batch_size(),
158 count_hint: get_default_count_hint(),
159 n_rows: None,
160 parallel: ParallelStrategy::None,
161 }
162 }
163}
164
165impl ScanOptions {
166 pub fn new(pattern: impl Into<String>) -> Self {
168 Self {
169 pattern: pattern.into(),
170 ..Default::default()
171 }
172 }
173
174 pub fn with_batch_size(mut self, size: usize) -> Self {
176 self.batch_size = size;
177 self
178 }
179
180 pub fn with_count_hint(mut self, count: usize) -> Self {
182 self.count_hint = count;
183 self
184 }
185
186 pub fn with_n_rows(mut self, n: usize) -> Self {
188 self.n_rows = Some(n);
189 self
190 }
191
192 pub fn with_parallel(mut self, strategy: ParallelStrategy) -> Self {
194 self.parallel = strategy;
195 self
196 }
197}
198
199#[derive(Debug, Clone)]
201pub struct KeyColumn {
202 pub enabled: bool,
204 pub name: String,
206}
207
208impl Default for KeyColumn {
209 fn default() -> Self {
210 Self {
211 enabled: true,
212 name: "_key".to_string(),
213 }
214 }
215}
216
217impl KeyColumn {
218 pub fn enabled() -> Self {
220 Self::default()
221 }
222
223 pub fn disabled() -> Self {
225 Self {
226 enabled: false,
227 name: "_key".to_string(),
228 }
229 }
230
231 pub fn with_name(mut self, name: impl Into<String>) -> Self {
233 self.name = name.into();
234 self
235 }
236}
237
238#[derive(Debug, Clone)]
240pub struct TtlColumn {
241 pub enabled: bool,
243 pub name: String,
245}
246
247impl Default for TtlColumn {
248 fn default() -> Self {
249 Self {
250 enabled: false,
251 name: "_ttl".to_string(),
252 }
253 }
254}
255
256impl TtlColumn {
257 pub fn enabled() -> Self {
259 Self {
260 enabled: true,
261 name: "_ttl".to_string(),
262 }
263 }
264
265 pub fn disabled() -> Self {
267 Self::default()
268 }
269
270 pub fn with_name(mut self, name: impl Into<String>) -> Self {
272 self.name = name.into();
273 self
274 }
275}
276
277#[derive(Debug, Clone)]
279pub struct RowIndexColumn {
280 pub enabled: bool,
282 pub name: String,
284 pub offset: u64,
286}
287
288impl Default for RowIndexColumn {
289 fn default() -> Self {
290 Self {
291 enabled: false,
292 name: "_index".to_string(),
293 offset: 0,
294 }
295 }
296}
297
298impl RowIndexColumn {
299 pub fn enabled() -> Self {
301 Self {
302 enabled: true,
303 name: "_index".to_string(),
304 offset: 0,
305 }
306 }
307
308 pub fn disabled() -> Self {
310 Self::default()
311 }
312
313 pub fn with_name(mut self, name: impl Into<String>) -> Self {
315 self.name = name.into();
316 self
317 }
318
319 pub fn with_offset(mut self, offset: u64) -> Self {
321 self.offset = offset;
322 self
323 }
324
325 pub fn to_row_index(&self) -> Option<RowIndex> {
327 if self.enabled {
328 Some(RowIndex {
329 name: self.name.clone(),
330 offset: self.offset,
331 })
332 } else {
333 None
334 }
335 }
336}
337
338#[derive(Debug, Clone)]
355pub struct HashScanOptions {
356 pub scan: ScanOptions,
358 pub include_key: bool,
360 pub key_column_name: Option<String>,
362 pub include_ttl: bool,
364 pub ttl_column_name: Option<String>,
366 pub row_index: Option<RowIndex>,
368 pub projection: Option<Vec<String>>,
370}
371
372impl Default for HashScanOptions {
373 fn default() -> Self {
374 Self {
375 scan: ScanOptions::default(),
376 include_key: true,
377 key_column_name: None,
378 include_ttl: false,
379 ttl_column_name: None,
380 row_index: None,
381 projection: None,
382 }
383 }
384}
385
386impl HashScanOptions {
387 pub fn new(pattern: impl Into<String>) -> Self {
389 Self {
390 scan: ScanOptions::new(pattern),
391 ..Default::default()
392 }
393 }
394
395 pub fn with_batch_size(mut self, size: usize) -> Self {
397 self.scan.batch_size = size;
398 self
399 }
400
401 pub fn with_count_hint(mut self, count: usize) -> Self {
403 self.scan.count_hint = count;
404 self
405 }
406
407 pub fn with_n_rows(mut self, n: usize) -> Self {
409 self.scan.n_rows = Some(n);
410 self
411 }
412
413 pub fn with_key(mut self, include: bool) -> Self {
415 self.include_key = include;
416 self
417 }
418
419 pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
421 self.key_column_name = Some(name.into());
422 self
423 }
424
425 pub fn with_ttl(mut self, include: bool) -> Self {
427 self.include_ttl = include;
428 self
429 }
430
431 pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
433 self.ttl_column_name = Some(name.into());
434 self
435 }
436
437 pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
439 self.row_index = Some(RowIndex {
440 name: name.into(),
441 offset,
442 });
443 self
444 }
445
446 pub fn with_projection(mut self, fields: Vec<impl Into<String>>) -> Self {
448 self.projection = Some(fields.into_iter().map(Into::into).collect());
449 self
450 }
451}
452
453#[derive(Debug, Clone)]
465pub struct JsonScanOptions {
466 pub scan: ScanOptions,
468 pub include_key: bool,
470 pub key_column_name: Option<String>,
472 pub include_ttl: bool,
474 pub ttl_column_name: Option<String>,
476 pub row_index: Option<RowIndex>,
478 pub path: Option<String>,
480 pub projection: Option<Vec<String>>,
482}
483
484impl Default for JsonScanOptions {
485 fn default() -> Self {
486 Self {
487 scan: ScanOptions::default(),
488 include_key: true,
489 key_column_name: None,
490 include_ttl: false,
491 ttl_column_name: None,
492 row_index: None,
493 path: None,
494 projection: None,
495 }
496 }
497}
498
499impl JsonScanOptions {
500 pub fn new(pattern: impl Into<String>) -> Self {
502 Self {
503 scan: ScanOptions::new(pattern),
504 ..Default::default()
505 }
506 }
507
508 pub fn with_batch_size(mut self, size: usize) -> Self {
510 self.scan.batch_size = size;
511 self
512 }
513
514 pub fn with_count_hint(mut self, count: usize) -> Self {
516 self.scan.count_hint = count;
517 self
518 }
519
520 pub fn with_n_rows(mut self, n: usize) -> Self {
522 self.scan.n_rows = Some(n);
523 self
524 }
525
526 pub fn with_key(mut self, include: bool) -> Self {
528 self.include_key = include;
529 self
530 }
531
532 pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
534 self.key_column_name = Some(name.into());
535 self
536 }
537
538 pub fn with_ttl(mut self, include: bool) -> Self {
540 self.include_ttl = include;
541 self
542 }
543
544 pub fn with_ttl_column_name(mut self, name: impl Into<String>) -> Self {
546 self.ttl_column_name = Some(name.into());
547 self
548 }
549
550 pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
552 self.row_index = Some(RowIndex {
553 name: name.into(),
554 offset,
555 });
556 self
557 }
558
559 pub fn with_path(mut self, path: impl Into<String>) -> Self {
561 self.path = Some(path.into());
562 self
563 }
564
565 pub fn with_projection(mut self, fields: Vec<impl Into<String>>) -> Self {
567 self.projection = Some(fields.into_iter().map(Into::into).collect());
568 self
569 }
570}
571
572#[derive(Debug, Clone)]
584pub struct StringScanOptions {
585 pub scan: ScanOptions,
587 pub include_key: bool,
589 pub key_column_name: Option<String>,
591 pub value_column_name: Option<String>,
593 pub row_index: Option<RowIndex>,
595}
596
597impl Default for StringScanOptions {
598 fn default() -> Self {
599 Self {
600 scan: ScanOptions::default(),
601 include_key: true,
602 key_column_name: None,
603 value_column_name: None,
604 row_index: None,
605 }
606 }
607}
608
609impl StringScanOptions {
610 pub fn new(pattern: impl Into<String>) -> Self {
612 Self {
613 scan: ScanOptions::new(pattern),
614 ..Default::default()
615 }
616 }
617
618 pub fn with_batch_size(mut self, size: usize) -> Self {
620 self.scan.batch_size = size;
621 self
622 }
623
624 pub fn with_count_hint(mut self, count: usize) -> Self {
626 self.scan.count_hint = count;
627 self
628 }
629
630 pub fn with_n_rows(mut self, n: usize) -> Self {
632 self.scan.n_rows = Some(n);
633 self
634 }
635
636 pub fn with_key(mut self, include: bool) -> Self {
638 self.include_key = include;
639 self
640 }
641
642 pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
644 self.key_column_name = Some(name.into());
645 self
646 }
647
648 pub fn with_value_column_name(mut self, name: impl Into<String>) -> Self {
650 self.value_column_name = Some(name.into());
651 self
652 }
653
654 pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
656 self.row_index = Some(RowIndex {
657 name: name.into(),
658 offset,
659 });
660 self
661 }
662}
663
664#[derive(Debug, Clone)]
677pub struct StreamScanOptions {
678 pub scan: ScanOptions,
680 pub include_key: bool,
682 pub key_column_name: Option<String>,
684 pub include_id: bool,
686 pub include_timestamp: bool,
688 pub include_sequence: bool,
690 pub row_index: Option<RowIndex>,
692 pub start_id: String,
694 pub end_id: String,
696 pub count_per_stream: Option<usize>,
698 pub fields: Option<Vec<String>>,
700}
701
702impl Default for StreamScanOptions {
703 fn default() -> Self {
704 Self {
705 scan: ScanOptions::default(),
706 include_key: true,
707 key_column_name: None,
708 include_id: true,
709 include_timestamp: true,
710 include_sequence: false,
711 row_index: None,
712 start_id: "-".to_string(),
713 end_id: "+".to_string(),
714 count_per_stream: None,
715 fields: None,
716 }
717 }
718}
719
720impl StreamScanOptions {
721 pub fn new(pattern: impl Into<String>) -> Self {
723 Self {
724 scan: ScanOptions::new(pattern),
725 ..Default::default()
726 }
727 }
728
729 pub fn with_batch_size(mut self, size: usize) -> Self {
731 self.scan.batch_size = size;
732 self
733 }
734
735 pub fn with_count_hint(mut self, count: usize) -> Self {
737 self.scan.count_hint = count;
738 self
739 }
740
741 pub fn with_n_rows(mut self, n: usize) -> Self {
743 self.scan.n_rows = Some(n);
744 self
745 }
746
747 pub fn with_key(mut self, include: bool) -> Self {
749 self.include_key = include;
750 self
751 }
752
753 pub fn with_id(mut self, include: bool) -> Self {
755 self.include_id = include;
756 self
757 }
758
759 pub fn with_timestamp(mut self, include: bool) -> Self {
761 self.include_timestamp = include;
762 self
763 }
764
765 pub fn with_sequence(mut self, include: bool) -> Self {
767 self.include_sequence = include;
768 self
769 }
770
771 pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
773 self.row_index = Some(RowIndex {
774 name: name.into(),
775 offset,
776 });
777 self
778 }
779
780 pub fn with_start_id(mut self, id: impl Into<String>) -> Self {
782 self.start_id = id.into();
783 self
784 }
785
786 pub fn with_end_id(mut self, id: impl Into<String>) -> Self {
788 self.end_id = id.into();
789 self
790 }
791
792 pub fn with_count_per_stream(mut self, count: usize) -> Self {
794 self.count_per_stream = Some(count);
795 self
796 }
797
798 pub fn with_fields(mut self, fields: Vec<impl Into<String>>) -> Self {
800 self.fields = Some(fields.into_iter().map(Into::into).collect());
801 self
802 }
803}
804
805#[derive(Debug, Clone)]
818pub struct TimeSeriesScanOptions {
819 pub scan: ScanOptions,
821 pub include_key: bool,
823 pub key_column_name: Option<String>,
825 pub timestamp_column_name: Option<String>,
827 pub value_column_name: Option<String>,
829 pub row_index: Option<RowIndex>,
831 pub start: String,
833 pub end: String,
835 pub count_per_series: Option<usize>,
837 pub aggregation: Option<String>,
839 pub bucket_size_ms: Option<i64>,
841}
842
843impl Default for TimeSeriesScanOptions {
844 fn default() -> Self {
845 Self {
846 scan: ScanOptions::default(),
847 include_key: true,
848 key_column_name: None,
849 timestamp_column_name: None,
850 value_column_name: None,
851 row_index: None,
852 start: "-".to_string(),
853 end: "+".to_string(),
854 count_per_series: None,
855 aggregation: None,
856 bucket_size_ms: None,
857 }
858 }
859}
860
861impl TimeSeriesScanOptions {
862 pub fn new(pattern: impl Into<String>) -> Self {
864 Self {
865 scan: ScanOptions::new(pattern),
866 ..Default::default()
867 }
868 }
869
870 pub fn with_batch_size(mut self, size: usize) -> Self {
872 self.scan.batch_size = size;
873 self
874 }
875
876 pub fn with_count_hint(mut self, count: usize) -> Self {
878 self.scan.count_hint = count;
879 self
880 }
881
882 pub fn with_n_rows(mut self, n: usize) -> Self {
884 self.scan.n_rows = Some(n);
885 self
886 }
887
888 pub fn with_key(mut self, include: bool) -> Self {
890 self.include_key = include;
891 self
892 }
893
894 pub fn with_key_column_name(mut self, name: impl Into<String>) -> Self {
896 self.key_column_name = Some(name.into());
897 self
898 }
899
900 pub fn with_timestamp_column_name(mut self, name: impl Into<String>) -> Self {
902 self.timestamp_column_name = Some(name.into());
903 self
904 }
905
906 pub fn with_value_column_name(mut self, name: impl Into<String>) -> Self {
908 self.value_column_name = Some(name.into());
909 self
910 }
911
912 pub fn with_row_index(mut self, name: impl Into<String>, offset: u64) -> Self {
914 self.row_index = Some(RowIndex {
915 name: name.into(),
916 offset,
917 });
918 self
919 }
920
921 pub fn with_start(mut self, start: impl Into<String>) -> Self {
923 self.start = start.into();
924 self
925 }
926
927 pub fn with_end(mut self, end: impl Into<String>) -> Self {
929 self.end = end.into();
930 self
931 }
932
933 pub fn with_count_per_series(mut self, count: usize) -> Self {
935 self.count_per_series = Some(count);
936 self
937 }
938
939 pub fn with_aggregation(mut self, agg_type: impl Into<String>, bucket_size_ms: i64) -> Self {
941 self.aggregation = Some(agg_type.into());
942 self.bucket_size_ms = Some(bucket_size_ms);
943 self
944 }
945}
946
947static DEFAULT_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
953 std::env::var("POLARS_REDIS_BATCH_SIZE")
954 .ok()
955 .and_then(|v| v.parse().ok())
956 .unwrap_or(1000)
957});
958
959static DEFAULT_COUNT_HINT: LazyLock<usize> = LazyLock::new(|| {
961 std::env::var("POLARS_REDIS_COUNT_HINT")
962 .ok()
963 .and_then(|v| v.parse().ok())
964 .unwrap_or(100)
965});
966
967static DEFAULT_TIMEOUT_MS: LazyLock<u64> = LazyLock::new(|| {
969 std::env::var("POLARS_REDIS_TIMEOUT_MS")
970 .ok()
971 .and_then(|v| v.parse().ok())
972 .unwrap_or(30000)
973});
974
975pub fn get_default_batch_size() -> usize {
977 *DEFAULT_BATCH_SIZE
978}
979
980pub fn get_default_count_hint() -> usize {
982 *DEFAULT_COUNT_HINT
983}
984
985pub fn get_default_timeout_ms() -> u64 {
987 *DEFAULT_TIMEOUT_MS
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993
994 #[test]
995 fn test_parallel_strategy_default() {
996 let strategy = ParallelStrategy::default();
997 assert_eq!(strategy, ParallelStrategy::None);
998 assert!(!strategy.is_parallel());
999 assert_eq!(strategy.worker_count(), 1);
1000 }
1001
1002 #[test]
1003 fn test_parallel_strategy_batches() {
1004 let strategy = ParallelStrategy::batches(4);
1005 assert_eq!(strategy, ParallelStrategy::Batches(4));
1006 assert!(strategy.is_parallel());
1007 assert_eq!(strategy.worker_count(), 4);
1008
1009 let min_strategy = ParallelStrategy::batches(0);
1011 assert_eq!(min_strategy.worker_count(), 1);
1012 }
1013
1014 #[test]
1015 fn test_parallel_strategy_auto() {
1016 let strategy = ParallelStrategy::Auto;
1017 assert!(strategy.is_parallel());
1018 assert_eq!(strategy.worker_count(), 4); assert_eq!(strategy.resolve(Some(500)), ParallelStrategy::None);
1022 assert_eq!(strategy.resolve(Some(5000)), ParallelStrategy::Batches(4));
1023 assert_eq!(strategy.resolve(None), ParallelStrategy::Batches(4));
1024 }
1025
1026 #[test]
1027 fn test_scan_options_default() {
1028 let opts = ScanOptions::default();
1029 assert_eq!(opts.pattern, "*");
1030 assert_eq!(opts.batch_size, get_default_batch_size());
1031 assert_eq!(opts.count_hint, get_default_count_hint());
1032 assert!(opts.n_rows.is_none());
1033 assert_eq!(opts.parallel, ParallelStrategy::None);
1034 }
1035
1036 #[test]
1037 fn test_scan_options_builder() {
1038 let opts = ScanOptions::new("user:*")
1039 .with_batch_size(500)
1040 .with_count_hint(50)
1041 .with_n_rows(1000)
1042 .with_parallel(ParallelStrategy::Batches(4));
1043
1044 assert_eq!(opts.pattern, "user:*");
1045 assert_eq!(opts.batch_size, 500);
1046 assert_eq!(opts.count_hint, 50);
1047 assert_eq!(opts.n_rows, Some(1000));
1048 assert_eq!(opts.parallel, ParallelStrategy::Batches(4));
1049 }
1050
1051 #[test]
1052 fn test_key_column() {
1053 let enabled = KeyColumn::enabled().with_name("redis_key");
1054 assert!(enabled.enabled);
1055 assert_eq!(enabled.name, "redis_key");
1056
1057 let disabled = KeyColumn::disabled();
1058 assert!(!disabled.enabled);
1059 }
1060
1061 #[test]
1062 fn test_ttl_column() {
1063 let enabled = TtlColumn::enabled().with_name("expiry");
1064 assert!(enabled.enabled);
1065 assert_eq!(enabled.name, "expiry");
1066
1067 let disabled = TtlColumn::disabled();
1068 assert!(!disabled.enabled);
1069 }
1070
1071 #[test]
1072 fn test_row_index_column() {
1073 let col = RowIndexColumn::enabled()
1074 .with_name("row_num")
1075 .with_offset(100);
1076
1077 assert!(col.enabled);
1078 assert_eq!(col.name, "row_num");
1079 assert_eq!(col.offset, 100);
1080
1081 let row_index = col.to_row_index().unwrap();
1082 assert_eq!(row_index.name, "row_num");
1083 assert_eq!(row_index.offset, 100);
1084 }
1085
1086 #[test]
1087 fn test_row_index() {
1088 let idx = RowIndex::new("idx").with_offset(50);
1089 assert_eq!(idx.name, "idx");
1090 assert_eq!(idx.offset, 50);
1091 }
1092
1093 #[test]
1094 fn test_hash_scan_options() {
1095 let opts = HashScanOptions::new("user:*")
1096 .with_batch_size(500)
1097 .with_count_hint(50)
1098 .with_n_rows(1000)
1099 .with_key(true)
1100 .with_key_column_name("redis_key")
1101 .with_ttl(true)
1102 .with_ttl_column_name("expiry")
1103 .with_row_index("idx", 10)
1104 .with_projection(vec!["name", "email"]);
1105
1106 assert_eq!(opts.scan.pattern, "user:*");
1107 assert_eq!(opts.scan.batch_size, 500);
1108 assert_eq!(opts.scan.count_hint, 50);
1109 assert_eq!(opts.scan.n_rows, Some(1000));
1110 assert!(opts.include_key);
1111 assert_eq!(opts.key_column_name, Some("redis_key".to_string()));
1112 assert!(opts.include_ttl);
1113 assert_eq!(opts.ttl_column_name, Some("expiry".to_string()));
1114 assert_eq!(
1115 opts.row_index.as_ref().map(|r| &r.name),
1116 Some(&"idx".to_string())
1117 );
1118 assert_eq!(opts.row_index.as_ref().map(|r| r.offset), Some(10));
1119 assert_eq!(
1120 opts.projection,
1121 Some(vec!["name".to_string(), "email".to_string()])
1122 );
1123 }
1124
1125 #[test]
1126 fn test_json_scan_options() {
1127 let opts = JsonScanOptions::new("doc:*")
1128 .with_batch_size(250)
1129 .with_path("$.user")
1130 .with_projection(vec!["name", "age"]);
1131
1132 assert_eq!(opts.scan.pattern, "doc:*");
1133 assert_eq!(opts.scan.batch_size, 250);
1134 assert_eq!(opts.path, Some("$.user".to_string()));
1135 assert_eq!(
1136 opts.projection,
1137 Some(vec!["name".to_string(), "age".to_string()])
1138 );
1139 }
1140
1141 #[test]
1142 fn test_string_scan_options() {
1143 let opts = StringScanOptions::new("counter:*")
1144 .with_batch_size(1000)
1145 .with_value_column_name("count")
1146 .with_key(false);
1147
1148 assert_eq!(opts.scan.pattern, "counter:*");
1149 assert_eq!(opts.scan.batch_size, 1000);
1150 assert_eq!(opts.value_column_name, Some("count".to_string()));
1151 assert!(!opts.include_key);
1152 }
1153
1154 #[test]
1155 fn test_stream_scan_options() {
1156 let opts = StreamScanOptions::new("events:*")
1157 .with_start_id("1000-0")
1158 .with_end_id("2000-0")
1159 .with_count_per_stream(100)
1160 .with_id(true)
1161 .with_timestamp(true)
1162 .with_sequence(true)
1163 .with_fields(vec!["action", "user"]);
1164
1165 assert_eq!(opts.scan.pattern, "events:*");
1166 assert_eq!(opts.start_id, "1000-0");
1167 assert_eq!(opts.end_id, "2000-0");
1168 assert_eq!(opts.count_per_stream, Some(100));
1169 assert!(opts.include_id);
1170 assert!(opts.include_timestamp);
1171 assert!(opts.include_sequence);
1172 assert_eq!(
1173 opts.fields,
1174 Some(vec!["action".to_string(), "user".to_string()])
1175 );
1176 }
1177
1178 #[test]
1179 fn test_timeseries_scan_options() {
1180 let opts = TimeSeriesScanOptions::new("sensor:*")
1181 .with_start("1000")
1182 .with_end("2000")
1183 .with_count_per_series(500)
1184 .with_aggregation("avg", 60000)
1185 .with_value_column_name("temperature")
1186 .with_timestamp_column_name("ts");
1187
1188 assert_eq!(opts.scan.pattern, "sensor:*");
1189 assert_eq!(opts.start, "1000");
1190 assert_eq!(opts.end, "2000");
1191 assert_eq!(opts.count_per_series, Some(500));
1192 assert_eq!(opts.aggregation, Some("avg".to_string()));
1193 assert_eq!(opts.bucket_size_ms, Some(60000));
1194 assert_eq!(opts.value_column_name, Some("temperature".to_string()));
1195 assert_eq!(opts.timestamp_column_name, Some("ts".to_string()));
1196 }
1197}