1use super::BlackBoxProcessor;
8use crate::{DsonOperation, FilterPredicate, OperationValue, StreamGenerator, TransformFunction};
9use fionn_core::Result;
10use std::collections::VecDeque;
11
12pub struct StreamingProcessor {
14 processor: BlackBoxProcessor,
15 buffer_size: usize,
16 current_batch: Vec<OperationValue>,
17 processed_batches: VecDeque<Vec<OperationValue>>,
18}
19
20impl StreamingProcessor {
21 #[must_use]
23 pub fn new(buffer_size: usize) -> Self {
24 Self {
25 processor: BlackBoxProcessor::new(vec![], vec![]),
26 buffer_size,
27 current_batch: Vec::new(),
28 processed_batches: VecDeque::new(),
29 }
30 }
31
32 pub fn process_stream(&mut self, operations: &[DsonOperation]) -> Result<()> {
37 for operation in operations {
38 self.apply_streaming_operation(operation)?;
39 }
40 Ok(())
41 }
42
43 fn apply_streaming_operation(&mut self, operation: &DsonOperation) -> Result<()> {
48 match operation {
49 DsonOperation::StreamBuild { path, generator } => self.stream_build(path, generator),
50 DsonOperation::StreamFilter { path, predicate } => {
51 self.stream_filter(path, predicate);
52 Ok(())
53 }
54 DsonOperation::StreamMap { path, transform } => {
55 self.stream_map(path, transform);
56 Ok(())
57 }
58 DsonOperation::StreamEmit { path, batch_size } => self.stream_emit(path, *batch_size),
59 _ => self.processor.apply_operation(operation),
61 }
62 }
63
64 fn stream_build(&mut self, path: &str, generator: &StreamGenerator) -> Result<()> {
66 match generator {
67 StreamGenerator::Range { start, end, step } => {
68 self.generate_range(*start, *end, *step);
69 }
70 StreamGenerator::Repeat(value, count) => {
71 self.generate_repeat(value, *count);
72 }
73 StreamGenerator::Fibonacci(count) => {
74 self.generate_fibonacci(*count);
75 }
76 StreamGenerator::Custom(_) => {
77 self.processor
80 .apply_operation(&DsonOperation::StreamBuild {
81 path: path.to_string(),
82 generator: generator.clone(),
83 })?;
84 }
85 }
86 Ok(())
87 }
88
89 fn generate_range(&mut self, start: i64, end: i64, step: i64) {
91 let mut current = start;
92 while current < end {
93 self.current_batch
94 .push(OperationValue::NumberRef(current.to_string()));
95 current += step;
96 }
97 }
99
100 fn generate_repeat(&mut self, value: &OperationValue, count: usize) {
102 for _ in 0..count {
103 self.current_batch.push(value.clone());
104 if self.current_batch.len() >= self.buffer_size {
105 self.flush_batch();
106 }
107 }
108 self.flush_batch();
109 }
110
111 fn generate_fibonacci(&mut self, count: usize) {
113 let mut a = 0i64;
114 let mut b = 1i64;
115
116 for _ in 0..count {
117 self.current_batch
118 .push(OperationValue::NumberRef(a.to_string()));
119 if self.current_batch.len() >= self.buffer_size {
120 self.flush_batch();
121 }
122
123 let temp = a;
124 a = b;
125 b += temp;
126 }
127 self.flush_batch();
128 }
129
130 fn stream_filter(&mut self, _path: &str, predicate: &FilterPredicate) {
132 let mut filtered = Vec::new();
134
135 for (index, value) in self.current_batch.iter().enumerate() {
136 if Self::matches_predicate(value, predicate, index) {
137 filtered.push(value.clone());
138 }
139 }
140
141 self.current_batch = filtered;
142 }
143
144 fn stream_map(&mut self, _path: &str, transform: &TransformFunction) {
146 let mut transformed = Vec::new();
148
149 for value in &self.current_batch {
150 let new_value = Self::apply_transform(value, transform);
151 transformed.push(new_value);
152 }
153
154 self.current_batch = transformed;
155 }
156
157 fn stream_emit(&mut self, path: &str, _batch_size: usize) -> Result<()> {
159 if self.current_batch.is_empty() && self.processed_batches.is_empty() {
161 return Ok(());
162 }
163
164 if !self.current_batch.is_empty() {
166 self.flush_batch();
167 }
168
169 while let Some(batch) = self.processed_batches.pop_front() {
171 let emit_path = format!("{}.batch_{}", path, self.processed_batches.len());
174 let batch_value = OperationValue::StringRef(format!("batch_size:{}", batch.len()));
175 self.processor.apply_operation(&DsonOperation::FieldAdd {
176 path: emit_path,
177 value: batch_value,
178 })?;
179 }
180
181 Ok(())
182 }
183
184 fn matches_predicate(
186 value: &OperationValue,
187 predicate: &FilterPredicate,
188 index: usize,
189 ) -> bool {
190 match predicate {
191 FilterPredicate::Even => {
192 if let OperationValue::NumberRef(num_str) = value {
193 num_str.parse::<i64>().is_ok_and(|num| num % 2 == 0)
194 } else {
195 false
196 }
197 }
198 FilterPredicate::Odd => {
199 if let OperationValue::NumberRef(num_str) = value {
200 num_str.parse::<i64>().is_ok_and(|num| num % 2 == 1)
201 } else {
202 false
203 }
204 }
205 FilterPredicate::EveryNth(n) => index.is_multiple_of(*n),
206 FilterPredicate::GreaterThan(threshold) => {
207 if let OperationValue::NumberRef(num_str) = value {
208 num_str.parse::<i64>().is_ok_and(|num| num > *threshold)
209 } else {
210 false
211 }
212 }
213 FilterPredicate::LessThan(threshold) => {
214 if let OperationValue::NumberRef(num_str) = value {
215 num_str.parse::<i64>().is_ok_and(|num| num < *threshold)
216 } else {
217 false
218 }
219 }
220 FilterPredicate::Equals(compare_value) => {
221 match (value, compare_value) {
222 (OperationValue::NumberRef(num_str), OperationValue::NumberRef(cmp_str)) => {
223 if let (Ok(a), Ok(b)) = (num_str.parse::<i64>(), cmp_str.parse::<i64>()) {
225 a == b
226 } else if let (Ok(a), Ok(b)) =
227 (num_str.parse::<f64>(), cmp_str.parse::<f64>())
228 {
229 (a - b).abs() < f64::EPSILON
230 } else {
231 num_str == cmp_str
232 }
233 }
234 (OperationValue::StringRef(s1), OperationValue::StringRef(s2)) => s1 == s2,
235 (OperationValue::BoolRef(b1), OperationValue::BoolRef(b2)) => b1 == b2,
236 (OperationValue::Null, OperationValue::Null) => true,
237 _ => false,
238 }
239 }
240 FilterPredicate::Alternate => {
241 index.is_multiple_of(2)
243 }
244 FilterPredicate::Custom(predicate_fn) => {
245 Self::evaluate_custom_predicate(predicate_fn, value)
248 }
249 }
250 }
251
252 fn evaluate_custom_predicate(predicate: &str, value: &OperationValue) -> bool {
254 let predicate = predicate.trim();
255
256 if let Some(rest) = predicate.strip_prefix("value") {
258 let rest = rest.trim();
259
260 if let Some(threshold_str) = rest.strip_prefix(">") {
262 if let Ok(threshold) = threshold_str.trim().parse::<i64>()
263 && let OperationValue::NumberRef(num_str) = value
264 && let Ok(num) = num_str.parse::<i64>()
265 {
266 return num > threshold;
267 }
268 } else if let Some(threshold_str) = rest.strip_prefix("<") {
269 if let Ok(threshold) = threshold_str.trim().parse::<i64>()
270 && let OperationValue::NumberRef(num_str) = value
271 && let Ok(num) = num_str.parse::<i64>()
272 {
273 return num < threshold;
274 }
275 } else if let Some(threshold_str) = rest.strip_prefix("==") {
276 let threshold_str = threshold_str.trim();
277 if let OperationValue::NumberRef(num_str) = value {
278 return num_str == threshold_str;
279 } else if let OperationValue::StringRef(s) = value {
280 return s == threshold_str.trim_matches('"');
281 }
282 } else if let Some(threshold_str) = rest.strip_prefix("!=") {
283 let threshold_str = threshold_str.trim();
284 if let OperationValue::NumberRef(num_str) = value {
285 return num_str != threshold_str;
286 } else if let OperationValue::StringRef(s) = value {
287 return s != threshold_str.trim_matches('"');
288 }
289 }
290 }
291
292 true
294 }
295
296 fn apply_transform(value: &OperationValue, transform: &TransformFunction) -> OperationValue {
298 match (value, transform) {
299 (OperationValue::NumberRef(num_str), TransformFunction::Add(delta)) => {
300 num_str.parse::<i64>().map_or_else(
301 |_| value.clone(),
302 |num| OperationValue::NumberRef((num + delta).to_string()),
303 )
304 }
305 (OperationValue::NumberRef(num_str), TransformFunction::Multiply(factor)) => {
306 num_str.parse::<i64>().map_or_else(
307 |_| value.clone(),
308 |num| OperationValue::NumberRef((num * factor).to_string()),
309 )
310 }
311 (OperationValue::StringRef(text), TransformFunction::ToUppercase) => {
312 OperationValue::StringRef(text.to_uppercase())
313 }
314 (OperationValue::StringRef(text), TransformFunction::ToLowercase) => {
315 OperationValue::StringRef(text.to_lowercase())
316 }
317 (OperationValue::StringRef(text), TransformFunction::Append(suffix)) => {
318 OperationValue::StringRef(format!("{text}{suffix}"))
319 }
320 (OperationValue::StringRef(text), TransformFunction::Prepend(prefix)) => {
321 OperationValue::StringRef(format!("{prefix}{text}"))
322 }
323 _ => value.clone(), }
325 }
326
327 fn flush_batch(&mut self) {
329 if !self.current_batch.is_empty() {
330 let batch = std::mem::take(&mut self.current_batch);
331 self.processed_batches.push_back(batch);
332 }
333 }
334
335 #[must_use]
337 pub const fn processor(&self) -> &BlackBoxProcessor {
338 &self.processor
339 }
340
341 #[must_use]
343 pub fn batch_count(&self) -> usize {
344 self.processed_batches.len()
345 }
346
347 #[must_use]
349 pub fn total_items(&self) -> usize {
350 self.processed_batches.iter().map(Vec::len).sum::<usize>() + self.current_batch.len()
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use crate::FilterPredicate;
358
359 #[test]
360 fn test_stream_range_generation() {
361 let mut processor = StreamingProcessor::new(10);
362
363 let operations = vec![DsonOperation::StreamBuild {
364 path: "numbers".to_string(),
365 generator: StreamGenerator::Range {
366 start: 0,
367 end: 25,
368 step: 5,
369 },
370 }];
371
372 processor.process_stream(&operations).unwrap();
373
374 assert_eq!(processor.total_items(), 5);
376 }
378
379 #[test]
380 fn test_stream_filter_even() {
381 let mut processor = StreamingProcessor::new(10);
382
383 let operations = vec![
384 DsonOperation::StreamBuild {
385 path: "numbers".to_string(),
386 generator: StreamGenerator::Range {
387 start: 0,
388 end: 10,
389 step: 1,
390 },
391 },
392 DsonOperation::StreamFilter {
393 path: "numbers".to_string(),
394 predicate: FilterPredicate::Even,
395 },
396 ];
397
398 processor.process_stream(&operations).unwrap();
399
400 assert_eq!(processor.total_items(), 5);
402 }
403
404 #[test]
405 fn test_stream_map_multiply() {
406 let mut processor = StreamingProcessor::new(10);
407
408 let operations = vec![
409 DsonOperation::StreamBuild {
410 path: "numbers".to_string(),
411 generator: StreamGenerator::Range {
412 start: 1,
413 end: 4,
414 step: 1,
415 },
416 },
417 DsonOperation::StreamMap {
418 path: "numbers".to_string(),
419 transform: TransformFunction::Multiply(2),
420 },
421 ];
422
423 processor.process_stream(&operations).unwrap();
424
425 assert_eq!(processor.total_items(), 3);
427 }
428
429 #[test]
430 fn test_stream_fibonacci() {
431 let mut processor = StreamingProcessor::new(10);
432
433 let operations = vec![DsonOperation::StreamBuild {
434 path: "fib".to_string(),
435 generator: StreamGenerator::Fibonacci(8),
436 }];
437
438 processor.process_stream(&operations).unwrap();
439
440 assert_eq!(processor.total_items(), 8);
442 }
443
444 #[test]
445 fn test_stream_repeat() {
446 let mut processor = StreamingProcessor::new(10);
447 let operations = vec![DsonOperation::StreamBuild {
448 path: "rep".to_string(),
449 generator: StreamGenerator::Repeat(OperationValue::StringRef("x".to_string()), 5),
450 }];
451 processor.process_stream(&operations).unwrap();
452 assert_eq!(processor.total_items(), 5);
453 }
454
455 #[test]
456 fn test_stream_emit() {
457 let mut processor = StreamingProcessor::new(5);
458 let operations = vec![
459 DsonOperation::StreamBuild {
460 path: "numbers".to_string(),
461 generator: StreamGenerator::Range {
462 start: 0,
463 end: 10,
464 step: 1,
465 },
466 },
467 DsonOperation::StreamEmit {
468 path: "out".to_string(),
469 batch_size: 5,
470 },
471 ];
472 processor.process_stream(&operations).unwrap();
473 }
474
475 #[test]
476 fn test_stream_filter_odd() {
477 let mut processor = StreamingProcessor::new(10);
478 let operations = vec![
479 DsonOperation::StreamBuild {
480 path: "numbers".to_string(),
481 generator: StreamGenerator::Range {
482 start: 0,
483 end: 10,
484 step: 1,
485 },
486 },
487 DsonOperation::StreamFilter {
488 path: "numbers".to_string(),
489 predicate: FilterPredicate::Odd,
490 },
491 ];
492 processor.process_stream(&operations).unwrap();
493 assert_eq!(processor.total_items(), 5);
494 }
495
496 #[test]
497 fn test_stream_filter_greater_than() {
498 let mut processor = StreamingProcessor::new(10);
499 let operations = vec![
500 DsonOperation::StreamBuild {
501 path: "numbers".to_string(),
502 generator: StreamGenerator::Range {
503 start: 0,
504 end: 10,
505 step: 1,
506 },
507 },
508 DsonOperation::StreamFilter {
509 path: "numbers".to_string(),
510 predicate: FilterPredicate::GreaterThan(5),
511 },
512 ];
513 processor.process_stream(&operations).unwrap();
514 assert_eq!(processor.total_items(), 4); }
516
517 #[test]
518 fn test_stream_filter_less_than() {
519 let mut processor = StreamingProcessor::new(10);
520 let operations = vec![
521 DsonOperation::StreamBuild {
522 path: "numbers".to_string(),
523 generator: StreamGenerator::Range {
524 start: 0,
525 end: 10,
526 step: 1,
527 },
528 },
529 DsonOperation::StreamFilter {
530 path: "numbers".to_string(),
531 predicate: FilterPredicate::LessThan(5),
532 },
533 ];
534 processor.process_stream(&operations).unwrap();
535 assert_eq!(processor.total_items(), 5); }
537
538 #[test]
539 fn test_stream_filter_equals() {
540 let mut processor = StreamingProcessor::new(10);
541 let operations = vec![
542 DsonOperation::StreamBuild {
543 path: "numbers".to_string(),
544 generator: StreamGenerator::Range {
545 start: 0,
546 end: 10,
547 step: 1,
548 },
549 },
550 DsonOperation::StreamFilter {
551 path: "numbers".to_string(),
552 predicate: FilterPredicate::Equals(OperationValue::NumberRef("5".to_string())),
553 },
554 ];
555 processor.process_stream(&operations).unwrap();
556 assert_eq!(processor.total_items(), 1);
557 }
558
559 #[test]
560 fn test_stream_filter_every_nth() {
561 let mut processor = StreamingProcessor::new(10);
562 let operations = vec![
563 DsonOperation::StreamBuild {
564 path: "numbers".to_string(),
565 generator: StreamGenerator::Range {
566 start: 0,
567 end: 12,
568 step: 1,
569 },
570 },
571 DsonOperation::StreamFilter {
572 path: "numbers".to_string(),
573 predicate: FilterPredicate::EveryNth(3),
574 },
575 ];
576 processor.process_stream(&operations).unwrap();
577 }
578
579 #[test]
580 fn test_stream_filter_alternate() {
581 let mut processor = StreamingProcessor::new(10);
582 let operations = vec![
583 DsonOperation::StreamBuild {
584 path: "numbers".to_string(),
585 generator: StreamGenerator::Range {
586 start: 0,
587 end: 10,
588 step: 1,
589 },
590 },
591 DsonOperation::StreamFilter {
592 path: "numbers".to_string(),
593 predicate: FilterPredicate::Alternate,
594 },
595 ];
596 processor.process_stream(&operations).unwrap();
597 }
598
599 #[test]
600 fn test_stream_filter_custom() {
601 let mut processor = StreamingProcessor::new(10);
602 let operations = vec![
603 DsonOperation::StreamBuild {
604 path: "numbers".to_string(),
605 generator: StreamGenerator::Range {
606 start: 0,
607 end: 20,
608 step: 1,
609 },
610 },
611 DsonOperation::StreamFilter {
612 path: "numbers".to_string(),
613 predicate: FilterPredicate::Custom("value > 10".to_string()),
614 },
615 ];
616 processor.process_stream(&operations).unwrap();
617 }
618
619 #[test]
620 fn test_stream_map_add() {
621 let mut processor = StreamingProcessor::new(10);
622 let operations = vec![
623 DsonOperation::StreamBuild {
624 path: "numbers".to_string(),
625 generator: StreamGenerator::Range {
626 start: 0,
627 end: 5,
628 step: 1,
629 },
630 },
631 DsonOperation::StreamMap {
632 path: "numbers".to_string(),
633 transform: TransformFunction::Add(10),
634 },
635 ];
636 processor.process_stream(&operations).unwrap();
637 }
638
639 #[test]
640 fn test_stream_map_to_lowercase() {
641 let mut processor = StreamingProcessor::new(10);
642 let operations = vec![
643 DsonOperation::StreamBuild {
644 path: "strings".to_string(),
645 generator: StreamGenerator::Repeat(
646 OperationValue::StringRef("HELLO".to_string()),
647 3,
648 ),
649 },
650 DsonOperation::StreamMap {
651 path: "strings".to_string(),
652 transform: TransformFunction::ToLowercase,
653 },
654 ];
655 processor.process_stream(&operations).unwrap();
656 }
657
658 #[test]
659 fn test_stream_map_to_uppercase() {
660 let mut processor = StreamingProcessor::new(10);
661 let operations = vec![
662 DsonOperation::StreamBuild {
663 path: "strings".to_string(),
664 generator: StreamGenerator::Repeat(
665 OperationValue::StringRef("hello".to_string()),
666 3,
667 ),
668 },
669 DsonOperation::StreamMap {
670 path: "strings".to_string(),
671 transform: TransformFunction::ToUppercase,
672 },
673 ];
674 processor.process_stream(&operations).unwrap();
675 }
676
677 #[test]
678 fn test_stream_map_append() {
679 let mut processor = StreamingProcessor::new(10);
680 let operations = vec![
681 DsonOperation::StreamBuild {
682 path: "strings".to_string(),
683 generator: StreamGenerator::Repeat(
684 OperationValue::StringRef("hello".to_string()),
685 2,
686 ),
687 },
688 DsonOperation::StreamMap {
689 path: "strings".to_string(),
690 transform: TransformFunction::Append("!".to_string()),
691 },
692 ];
693 processor.process_stream(&operations).unwrap();
694 }
695
696 #[test]
697 fn test_stream_map_prepend() {
698 let mut processor = StreamingProcessor::new(10);
699 let operations = vec![
700 DsonOperation::StreamBuild {
701 path: "strings".to_string(),
702 generator: StreamGenerator::Repeat(
703 OperationValue::StringRef("world".to_string()),
704 2,
705 ),
706 },
707 DsonOperation::StreamMap {
708 path: "strings".to_string(),
709 transform: TransformFunction::Prepend("hello ".to_string()),
710 },
711 ];
712 processor.process_stream(&operations).unwrap();
713 }
714
715 #[test]
716 fn test_stream_custom_generator() {
717 let mut processor = StreamingProcessor::new(10);
718 let operations = vec![DsonOperation::StreamBuild {
719 path: "custom".to_string(),
720 generator: StreamGenerator::Custom("test".to_string()),
721 }];
722 processor.process_stream(&operations).unwrap();
723 }
724
725 #[test]
726 fn test_processor_getter() {
727 let processor = StreamingProcessor::new(10);
728 let _ = processor.processor();
729 }
730
731 #[test]
732 fn test_batch_count_empty() {
733 let processor = StreamingProcessor::new(10);
734 assert_eq!(processor.batch_count(), 0);
735 }
736
737 #[test]
740 fn test_pass_through_operations() {
741 let mut processor = StreamingProcessor::new(10);
742 let operations = vec![DsonOperation::FieldAdd {
744 path: "test".to_string(),
745 value: OperationValue::StringRef("value".to_string()),
746 }];
747 processor.process_stream(&operations).unwrap();
748 }
749
750 #[test]
751 fn test_generate_repeat_with_flush() {
752 let mut processor = StreamingProcessor::new(3);
754 let operations = vec![DsonOperation::StreamBuild {
755 path: "rep".to_string(),
756 generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
757 }];
758 processor.process_stream(&operations).unwrap();
759 assert!(processor.batch_count() > 0);
761 }
762
763 #[test]
764 fn test_generate_fibonacci_with_flush() {
765 let mut processor = StreamingProcessor::new(3);
767 let operations = vec![DsonOperation::StreamBuild {
768 path: "fib".to_string(),
769 generator: StreamGenerator::Fibonacci(15),
770 }];
771 processor.process_stream(&operations).unwrap();
772 assert!(processor.batch_count() > 0);
774 }
775
776 #[test]
777 fn test_stream_emit_empty() {
778 let mut processor = StreamingProcessor::new(10);
779 let operations = vec![DsonOperation::StreamEmit {
781 path: "out".to_string(),
782 batch_size: 5,
783 }];
784 processor.process_stream(&operations).unwrap();
785 }
786
787 #[test]
788 fn test_stream_emit_with_processed_batches() {
789 let mut processor = StreamingProcessor::new(3);
790 let operations = vec![
792 DsonOperation::StreamBuild {
793 path: "numbers".to_string(),
794 generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
795 },
796 DsonOperation::StreamEmit {
797 path: "out".to_string(),
798 batch_size: 3,
799 },
800 ];
801 processor.process_stream(&operations).unwrap();
802 }
803
804 #[test]
805 fn test_custom_predicate_less_than() {
806 let mut processor = StreamingProcessor::new(10);
807 let operations = vec![
808 DsonOperation::StreamBuild {
809 path: "numbers".to_string(),
810 generator: StreamGenerator::Range {
811 start: 0,
812 end: 20,
813 step: 1,
814 },
815 },
816 DsonOperation::StreamFilter {
817 path: "numbers".to_string(),
818 predicate: FilterPredicate::Custom("value < 5".to_string()),
819 },
820 ];
821 processor.process_stream(&operations).unwrap();
822 assert_eq!(processor.total_items(), 5); }
824
825 #[test]
826 fn test_custom_predicate_equals() {
827 let mut processor = StreamingProcessor::new(10);
828 let operations = vec![
829 DsonOperation::StreamBuild {
830 path: "numbers".to_string(),
831 generator: StreamGenerator::Range {
832 start: 0,
833 end: 10,
834 step: 1,
835 },
836 },
837 DsonOperation::StreamFilter {
838 path: "numbers".to_string(),
839 predicate: FilterPredicate::Custom("value == 5".to_string()),
840 },
841 ];
842 processor.process_stream(&operations).unwrap();
843 assert_eq!(processor.total_items(), 1);
844 }
845
846 #[test]
847 fn test_custom_predicate_not_equals() {
848 let mut processor = StreamingProcessor::new(10);
849 let operations = vec![
850 DsonOperation::StreamBuild {
851 path: "numbers".to_string(),
852 generator: StreamGenerator::Range {
853 start: 0,
854 end: 10,
855 step: 1,
856 },
857 },
858 DsonOperation::StreamFilter {
859 path: "numbers".to_string(),
860 predicate: FilterPredicate::Custom("value != 5".to_string()),
861 },
862 ];
863 processor.process_stream(&operations).unwrap();
864 assert_eq!(processor.total_items(), 9); }
866
867 #[test]
868 fn test_custom_predicate_invalid() {
869 let mut processor = StreamingProcessor::new(10);
870 let operations = vec![
871 DsonOperation::StreamBuild {
872 path: "numbers".to_string(),
873 generator: StreamGenerator::Range {
874 start: 0,
875 end: 5,
876 step: 1,
877 },
878 },
879 DsonOperation::StreamFilter {
880 path: "numbers".to_string(),
881 predicate: FilterPredicate::Custom("invalid_predicate".to_string()),
882 },
883 ];
884 processor.process_stream(&operations).unwrap();
885 assert_eq!(processor.total_items(), 5);
887 }
888
889 #[test]
890 fn test_filter_equals_string() {
891 let mut processor = StreamingProcessor::new(10);
892 let operations = vec![
893 DsonOperation::StreamBuild {
894 path: "strings".to_string(),
895 generator: StreamGenerator::Repeat(
896 OperationValue::StringRef("hello".to_string()),
897 3,
898 ),
899 },
900 DsonOperation::StreamFilter {
901 path: "strings".to_string(),
902 predicate: FilterPredicate::Equals(OperationValue::StringRef("hello".to_string())),
903 },
904 ];
905 processor.process_stream(&operations).unwrap();
906 assert_eq!(processor.total_items(), 3);
907 }
908
909 #[test]
910 fn test_filter_equals_bool() {
911 let mut processor = StreamingProcessor::new(10);
912 processor.current_batch.push(OperationValue::BoolRef(true));
914 processor.current_batch.push(OperationValue::BoolRef(false));
915 processor.current_batch.push(OperationValue::BoolRef(true));
916
917 let operations = vec![DsonOperation::StreamFilter {
918 path: "bools".to_string(),
919 predicate: FilterPredicate::Equals(OperationValue::BoolRef(true)),
920 }];
921 processor.process_stream(&operations).unwrap();
922 assert_eq!(processor.total_items(), 2); }
924
925 #[test]
926 fn test_filter_equals_null() {
927 let mut processor = StreamingProcessor::new(10);
928 processor.current_batch.push(OperationValue::Null);
929 processor
930 .current_batch
931 .push(OperationValue::StringRef("not null".to_string()));
932 processor.current_batch.push(OperationValue::Null);
933
934 let operations = vec![DsonOperation::StreamFilter {
935 path: "nulls".to_string(),
936 predicate: FilterPredicate::Equals(OperationValue::Null),
937 }];
938 processor.process_stream(&operations).unwrap();
939 assert_eq!(processor.total_items(), 2); }
941
942 #[test]
943 fn test_filter_equals_type_mismatch() {
944 let mut processor = StreamingProcessor::new(10);
945 processor
946 .current_batch
947 .push(OperationValue::NumberRef("5".to_string()));
948
949 let operations = vec![DsonOperation::StreamFilter {
950 path: "test".to_string(),
951 predicate: FilterPredicate::Equals(OperationValue::StringRef("5".to_string())),
952 }];
953 processor.process_stream(&operations).unwrap();
954 assert_eq!(processor.total_items(), 0);
956 }
957
958 #[test]
959 fn test_filter_even_with_non_numbers() {
960 let mut processor = StreamingProcessor::new(10);
961 processor
962 .current_batch
963 .push(OperationValue::StringRef("not a number".to_string()));
964 processor
965 .current_batch
966 .push(OperationValue::NumberRef("4".to_string()));
967 processor.current_batch.push(OperationValue::BoolRef(true));
968
969 let operations = vec![DsonOperation::StreamFilter {
970 path: "test".to_string(),
971 predicate: FilterPredicate::Even,
972 }];
973 processor.process_stream(&operations).unwrap();
974 assert_eq!(processor.total_items(), 1); }
976
977 #[test]
978 fn test_filter_odd_with_non_numbers() {
979 let mut processor = StreamingProcessor::new(10);
980 processor
981 .current_batch
982 .push(OperationValue::StringRef("not a number".to_string()));
983 processor
984 .current_batch
985 .push(OperationValue::NumberRef("3".to_string()));
986
987 let operations = vec![DsonOperation::StreamFilter {
988 path: "test".to_string(),
989 predicate: FilterPredicate::Odd,
990 }];
991 processor.process_stream(&operations).unwrap();
992 assert_eq!(processor.total_items(), 1); }
994
995 #[test]
996 fn test_filter_greater_than_with_non_numbers() {
997 let mut processor = StreamingProcessor::new(10);
998 processor
999 .current_batch
1000 .push(OperationValue::StringRef("text".to_string()));
1001 processor
1002 .current_batch
1003 .push(OperationValue::NumberRef("10".to_string()));
1004
1005 let operations = vec![DsonOperation::StreamFilter {
1006 path: "test".to_string(),
1007 predicate: FilterPredicate::GreaterThan(5),
1008 }];
1009 processor.process_stream(&operations).unwrap();
1010 assert_eq!(processor.total_items(), 1); }
1012
1013 #[test]
1014 fn test_filter_less_than_with_non_numbers() {
1015 let mut processor = StreamingProcessor::new(10);
1016 processor.current_batch.push(OperationValue::BoolRef(false));
1017 processor
1018 .current_batch
1019 .push(OperationValue::NumberRef("3".to_string()));
1020
1021 let operations = vec![DsonOperation::StreamFilter {
1022 path: "test".to_string(),
1023 predicate: FilterPredicate::LessThan(5),
1024 }];
1025 processor.process_stream(&operations).unwrap();
1026 assert_eq!(processor.total_items(), 1); }
1028
1029 #[test]
1030 fn test_transform_invalid_number() {
1031 let mut processor = StreamingProcessor::new(10);
1032 processor
1033 .current_batch
1034 .push(OperationValue::NumberRef("not_a_number".to_string()));
1035
1036 let operations = vec![DsonOperation::StreamMap {
1037 path: "test".to_string(),
1038 transform: TransformFunction::Add(10),
1039 }];
1040 processor.process_stream(&operations).unwrap();
1041 assert_eq!(processor.total_items(), 1);
1043 }
1044
1045 #[test]
1046 fn test_transform_multiply_invalid_number() {
1047 let mut processor = StreamingProcessor::new(10);
1048 processor
1049 .current_batch
1050 .push(OperationValue::NumberRef("invalid".to_string()));
1051
1052 let operations = vec![DsonOperation::StreamMap {
1053 path: "test".to_string(),
1054 transform: TransformFunction::Multiply(2),
1055 }];
1056 processor.process_stream(&operations).unwrap();
1057 assert_eq!(processor.total_items(), 1);
1058 }
1059
1060 #[test]
1061 fn test_transform_unsupported_combination() {
1062 let mut processor = StreamingProcessor::new(10);
1063 processor
1065 .current_batch
1066 .push(OperationValue::NumberRef("5".to_string()));
1067
1068 let operations = vec![DsonOperation::StreamMap {
1069 path: "test".to_string(),
1070 transform: TransformFunction::ToUppercase,
1071 }];
1072 processor.process_stream(&operations).unwrap();
1073 assert_eq!(processor.total_items(), 1);
1075 }
1076
1077 #[test]
1078 fn test_filter_equals_float_comparison() {
1079 let mut processor = StreamingProcessor::new(10);
1080 processor
1081 .current_batch
1082 .push(OperationValue::NumberRef("3.14".to_string()));
1083 processor
1084 .current_batch
1085 .push(OperationValue::NumberRef("2.71".to_string()));
1086
1087 let operations = vec![DsonOperation::StreamFilter {
1088 path: "floats".to_string(),
1089 predicate: FilterPredicate::Equals(OperationValue::NumberRef("3.14".to_string())),
1090 }];
1091 processor.process_stream(&operations).unwrap();
1092 assert_eq!(processor.total_items(), 1);
1093 }
1094
1095 #[test]
1096 fn test_custom_predicate_string_equals() {
1097 let mut processor = StreamingProcessor::new(10);
1098 processor
1099 .current_batch
1100 .push(OperationValue::StringRef("hello".to_string()));
1101 processor
1102 .current_batch
1103 .push(OperationValue::StringRef("world".to_string()));
1104
1105 let operations = vec![DsonOperation::StreamFilter {
1106 path: "strings".to_string(),
1107 predicate: FilterPredicate::Custom("value == \"hello\"".to_string()),
1108 }];
1109 processor.process_stream(&operations).unwrap();
1110 assert_eq!(processor.total_items(), 1);
1111 }
1112
1113 #[test]
1114 fn test_custom_predicate_string_not_equals() {
1115 let mut processor = StreamingProcessor::new(10);
1116 processor
1117 .current_batch
1118 .push(OperationValue::StringRef("hello".to_string()));
1119 processor
1120 .current_batch
1121 .push(OperationValue::StringRef("world".to_string()));
1122
1123 let operations = vec![DsonOperation::StreamFilter {
1124 path: "strings".to_string(),
1125 predicate: FilterPredicate::Custom("value != \"hello\"".to_string()),
1126 }];
1127 processor.process_stream(&operations).unwrap();
1128 assert_eq!(processor.total_items(), 1);
1129 }
1130
1131 #[test]
1132 fn test_batch_count_after_generation() {
1133 let mut processor = StreamingProcessor::new(3);
1134 let operations = vec![DsonOperation::StreamBuild {
1135 path: "nums".to_string(),
1136 generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 9),
1137 }];
1138 processor.process_stream(&operations).unwrap();
1139 assert!(processor.batch_count() >= 3);
1140 }
1141
1142 #[test]
1143 fn test_total_items_mixed() {
1144 let mut processor = StreamingProcessor::new(5);
1145 processor.current_batch.push(OperationValue::Null);
1147 processor.current_batch.push(OperationValue::Null);
1148
1149 processor.flush_batch();
1151
1152 processor.current_batch.push(OperationValue::Null);
1154
1155 assert_eq!(processor.total_items(), 3);
1156 }
1157}