1use fionn_core::Result;
8use fionn_ops::processor::BlackBoxProcessor;
9use fionn_ops::{
10 DsonOperation, FilterPredicate, OperationValue, StreamGenerator, TransformFunction,
11};
12use std::collections::VecDeque;
13
14pub struct StreamingProcessor {
16 processor: BlackBoxProcessor,
17 buffer_size: usize,
18 current_batch: Vec<OperationValue>,
19 processed_batches: VecDeque<Vec<OperationValue>>,
20}
21
22impl StreamingProcessor {
23 #[must_use]
25 pub fn new(buffer_size: usize) -> Self {
26 Self {
27 processor: BlackBoxProcessor::new(vec![], vec![]),
28 buffer_size,
29 current_batch: Vec::new(),
30 processed_batches: VecDeque::new(),
31 }
32 }
33
34 pub fn process_stream(&mut self, operations: &[DsonOperation]) -> Result<()> {
39 for operation in operations {
40 self.apply_streaming_operation(operation)?;
41 }
42 Ok(())
43 }
44
45 fn apply_streaming_operation(&mut self, operation: &DsonOperation) -> Result<()> {
50 match operation {
51 DsonOperation::StreamBuild { path, generator } => self.stream_build(path, generator),
52 DsonOperation::StreamFilter { path, predicate } => {
53 self.stream_filter(path, predicate);
54 Ok(())
55 }
56 DsonOperation::StreamMap { path, transform } => {
57 self.stream_map(path, transform);
58 Ok(())
59 }
60 DsonOperation::StreamEmit { path, batch_size } => self.stream_emit(path, *batch_size),
61 _ => self.processor.apply_operation(operation),
63 }
64 }
65
66 fn stream_build(&mut self, path: &str, generator: &StreamGenerator) -> Result<()> {
68 match generator {
69 StreamGenerator::Range { start, end, step } => {
70 self.generate_range(*start, *end, *step);
71 }
72 StreamGenerator::Repeat(value, count) => {
73 self.generate_repeat(value, *count);
74 }
75 StreamGenerator::Fibonacci(count) => {
76 self.generate_fibonacci(*count);
77 }
78 StreamGenerator::Custom(_) => {
79 self.processor
82 .apply_operation(&DsonOperation::StreamBuild {
83 path: path.to_string(),
84 generator: generator.clone(),
85 })?;
86 }
87 }
88 Ok(())
89 }
90
91 fn generate_range(&mut self, start: i64, end: i64, step: i64) {
93 let mut current = start;
94 while current < end {
95 self.current_batch
96 .push(OperationValue::NumberRef(current.to_string()));
97 current += step;
98 }
99 }
101
102 fn generate_repeat(&mut self, value: &OperationValue, count: usize) {
104 for _ in 0..count {
105 self.current_batch.push(value.clone());
106 if self.current_batch.len() >= self.buffer_size {
107 self.flush_batch();
108 }
109 }
110 self.flush_batch();
111 }
112
113 fn generate_fibonacci(&mut self, count: usize) {
115 let mut a = 0i64;
116 let mut b = 1i64;
117
118 for _ in 0..count {
119 self.current_batch
120 .push(OperationValue::NumberRef(a.to_string()));
121 if self.current_batch.len() >= self.buffer_size {
122 self.flush_batch();
123 }
124
125 let temp = a;
126 a = b;
127 b += temp;
128 }
129 self.flush_batch();
130 }
131
132 fn stream_filter(&mut self, _path: &str, predicate: &FilterPredicate) {
134 let mut filtered = Vec::new();
136
137 for (index, value) in self.current_batch.iter().enumerate() {
138 if Self::matches_predicate(value, predicate, index) {
139 filtered.push(value.clone());
140 }
141 }
142
143 self.current_batch = filtered;
144 }
145
146 fn stream_map(&mut self, _path: &str, transform: &TransformFunction) {
148 let mut transformed = Vec::new();
150
151 for value in &self.current_batch {
152 let new_value = Self::apply_transform(value, transform);
153 transformed.push(new_value);
154 }
155
156 self.current_batch = transformed;
157 }
158
159 fn stream_emit(&mut self, path: &str, _batch_size: usize) -> Result<()> {
161 if self.current_batch.is_empty() && self.processed_batches.is_empty() {
163 return Ok(());
164 }
165
166 if !self.current_batch.is_empty() {
168 self.flush_batch();
169 }
170
171 while let Some(batch) = self.processed_batches.pop_front() {
173 let emit_path = format!("{}.batch_{}", path, self.processed_batches.len());
176 let batch_value = OperationValue::StringRef(format!("batch_size:{}", batch.len()));
177 self.processor.apply_operation(&DsonOperation::FieldAdd {
178 path: emit_path,
179 value: batch_value,
180 })?;
181 }
182
183 Ok(())
184 }
185
186 fn matches_predicate(
188 value: &OperationValue,
189 predicate: &FilterPredicate,
190 index: usize,
191 ) -> bool {
192 match predicate {
193 FilterPredicate::Even => {
194 if let OperationValue::NumberRef(num_str) = value {
195 num_str.parse::<i64>().is_ok_and(|num| num % 2 == 0)
196 } else {
197 false
198 }
199 }
200 FilterPredicate::Odd => {
201 if let OperationValue::NumberRef(num_str) = value {
202 num_str.parse::<i64>().is_ok_and(|num| num % 2 == 1)
203 } else {
204 false
205 }
206 }
207 FilterPredicate::EveryNth(n) => index.is_multiple_of(*n),
208 FilterPredicate::GreaterThan(threshold) => {
209 if let OperationValue::NumberRef(num_str) = value {
210 num_str.parse::<i64>().is_ok_and(|num| num > *threshold)
211 } else {
212 false
213 }
214 }
215 FilterPredicate::LessThan(threshold) => {
216 if let OperationValue::NumberRef(num_str) = value {
217 num_str.parse::<i64>().is_ok_and(|num| num < *threshold)
218 } else {
219 false
220 }
221 }
222 FilterPredicate::Equals(compare_value) => {
223 match (value, compare_value) {
224 (OperationValue::NumberRef(num_str), OperationValue::NumberRef(cmp_str)) => {
225 if let (Ok(a), Ok(b)) = (num_str.parse::<i64>(), cmp_str.parse::<i64>()) {
227 a == b
228 } else if let (Ok(a), Ok(b)) =
229 (num_str.parse::<f64>(), cmp_str.parse::<f64>())
230 {
231 (a - b).abs() < f64::EPSILON
232 } else {
233 num_str == cmp_str
234 }
235 }
236 (OperationValue::StringRef(s1), OperationValue::StringRef(s2)) => s1 == s2,
237 (OperationValue::BoolRef(b1), OperationValue::BoolRef(b2)) => b1 == b2,
238 (OperationValue::Null, OperationValue::Null) => true,
239 _ => false,
240 }
241 }
242 FilterPredicate::Alternate => {
243 index.is_multiple_of(2)
245 }
246 FilterPredicate::Custom(predicate_fn) => {
247 Self::evaluate_custom_predicate(predicate_fn, value)
250 }
251 }
252 }
253
254 fn evaluate_custom_predicate(predicate: &str, value: &OperationValue) -> bool {
256 let predicate = predicate.trim();
257
258 if let Some(rest) = predicate.strip_prefix("value") {
260 let rest = rest.trim();
261
262 if let Some(threshold_str) = rest.strip_prefix(">") {
264 if let Ok(threshold) = threshold_str.trim().parse::<i64>()
265 && let OperationValue::NumberRef(num_str) = value
266 && let Ok(num) = num_str.parse::<i64>()
267 {
268 return num > threshold;
269 }
270 } else if let Some(threshold_str) = rest.strip_prefix("<") {
271 if let Ok(threshold) = threshold_str.trim().parse::<i64>()
272 && let OperationValue::NumberRef(num_str) = value
273 && let Ok(num) = num_str.parse::<i64>()
274 {
275 return num < threshold;
276 }
277 } else if let Some(threshold_str) = rest.strip_prefix("==") {
278 let threshold_str = threshold_str.trim();
279 if let OperationValue::NumberRef(num_str) = value {
280 return num_str == threshold_str;
281 } else if let OperationValue::StringRef(s) = value {
282 return s == threshold_str.trim_matches('"');
283 }
284 } else if let Some(threshold_str) = rest.strip_prefix("!=") {
285 let threshold_str = threshold_str.trim();
286 if let OperationValue::NumberRef(num_str) = value {
287 return num_str != threshold_str;
288 } else if let OperationValue::StringRef(s) = value {
289 return s != threshold_str.trim_matches('"');
290 }
291 }
292 }
293
294 true
296 }
297
298 fn apply_transform(value: &OperationValue, transform: &TransformFunction) -> OperationValue {
300 match (value, transform) {
301 (OperationValue::NumberRef(num_str), TransformFunction::Add(delta)) => {
302 num_str.parse::<i64>().map_or_else(
303 |_| value.clone(),
304 |num| OperationValue::NumberRef((num + delta).to_string()),
305 )
306 }
307 (OperationValue::NumberRef(num_str), TransformFunction::Multiply(factor)) => {
308 num_str.parse::<i64>().map_or_else(
309 |_| value.clone(),
310 |num| OperationValue::NumberRef((num * factor).to_string()),
311 )
312 }
313 (OperationValue::StringRef(text), TransformFunction::ToUppercase) => {
314 OperationValue::StringRef(text.to_uppercase())
315 }
316 (OperationValue::StringRef(text), TransformFunction::ToLowercase) => {
317 OperationValue::StringRef(text.to_lowercase())
318 }
319 (OperationValue::StringRef(text), TransformFunction::Append(suffix)) => {
320 OperationValue::StringRef(format!("{text}{suffix}"))
321 }
322 (OperationValue::StringRef(text), TransformFunction::Prepend(prefix)) => {
323 OperationValue::StringRef(format!("{prefix}{text}"))
324 }
325 _ => value.clone(), }
327 }
328
329 fn flush_batch(&mut self) {
331 if !self.current_batch.is_empty() {
332 let batch = std::mem::take(&mut self.current_batch);
333 self.processed_batches.push_back(batch);
334 }
335 }
336
337 #[must_use]
339 pub const fn processor(&self) -> &BlackBoxProcessor {
340 &self.processor
341 }
342
343 #[must_use]
345 pub fn batch_count(&self) -> usize {
346 self.processed_batches.len()
347 }
348
349 #[must_use]
351 pub fn total_items(&self) -> usize {
352 self.processed_batches.iter().map(Vec::len).sum::<usize>() + self.current_batch.len()
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use fionn_ops::FilterPredicate;
360
361 #[test]
362 fn test_stream_range_generation() {
363 let mut processor = StreamingProcessor::new(10);
364
365 let operations = vec![DsonOperation::StreamBuild {
366 path: "numbers".to_string(),
367 generator: StreamGenerator::Range {
368 start: 0,
369 end: 25,
370 step: 5,
371 },
372 }];
373
374 processor.process_stream(&operations).unwrap();
375
376 assert_eq!(processor.total_items(), 5);
378 }
380
381 #[test]
382 fn test_stream_filter_even() {
383 let mut processor = StreamingProcessor::new(10);
384
385 let operations = vec![
386 DsonOperation::StreamBuild {
387 path: "numbers".to_string(),
388 generator: StreamGenerator::Range {
389 start: 0,
390 end: 10,
391 step: 1,
392 },
393 },
394 DsonOperation::StreamFilter {
395 path: "numbers".to_string(),
396 predicate: FilterPredicate::Even,
397 },
398 ];
399
400 processor.process_stream(&operations).unwrap();
401
402 assert_eq!(processor.total_items(), 5);
404 }
405
406 #[test]
407 fn test_stream_map_multiply() {
408 let mut processor = StreamingProcessor::new(10);
409
410 let operations = vec![
411 DsonOperation::StreamBuild {
412 path: "numbers".to_string(),
413 generator: StreamGenerator::Range {
414 start: 1,
415 end: 4,
416 step: 1,
417 },
418 },
419 DsonOperation::StreamMap {
420 path: "numbers".to_string(),
421 transform: TransformFunction::Multiply(2),
422 },
423 ];
424
425 processor.process_stream(&operations).unwrap();
426
427 assert_eq!(processor.total_items(), 3);
429 }
430
431 #[test]
432 fn test_stream_fibonacci() {
433 let mut processor = StreamingProcessor::new(10);
434
435 let operations = vec![DsonOperation::StreamBuild {
436 path: "fib".to_string(),
437 generator: StreamGenerator::Fibonacci(8),
438 }];
439
440 processor.process_stream(&operations).unwrap();
441
442 assert_eq!(processor.total_items(), 8);
444 }
445
446 #[test]
447 fn test_stream_repeat() {
448 let mut processor = StreamingProcessor::new(10);
449 let operations = vec![DsonOperation::StreamBuild {
450 path: "rep".to_string(),
451 generator: StreamGenerator::Repeat(OperationValue::StringRef("x".to_string()), 5),
452 }];
453 processor.process_stream(&operations).unwrap();
454 assert_eq!(processor.total_items(), 5);
455 }
456
457 #[test]
458 fn test_stream_emit() {
459 let mut processor = StreamingProcessor::new(5);
460 let operations = vec![
461 DsonOperation::StreamBuild {
462 path: "numbers".to_string(),
463 generator: StreamGenerator::Range {
464 start: 0,
465 end: 10,
466 step: 1,
467 },
468 },
469 DsonOperation::StreamEmit {
470 path: "out".to_string(),
471 batch_size: 5,
472 },
473 ];
474 processor.process_stream(&operations).unwrap();
475 }
476
477 #[test]
478 fn test_stream_filter_odd() {
479 let mut processor = StreamingProcessor::new(10);
480 let operations = vec![
481 DsonOperation::StreamBuild {
482 path: "numbers".to_string(),
483 generator: StreamGenerator::Range {
484 start: 0,
485 end: 10,
486 step: 1,
487 },
488 },
489 DsonOperation::StreamFilter {
490 path: "numbers".to_string(),
491 predicate: FilterPredicate::Odd,
492 },
493 ];
494 processor.process_stream(&operations).unwrap();
495 assert_eq!(processor.total_items(), 5);
496 }
497
498 #[test]
499 fn test_stream_filter_greater_than() {
500 let mut processor = StreamingProcessor::new(10);
501 let operations = vec![
502 DsonOperation::StreamBuild {
503 path: "numbers".to_string(),
504 generator: StreamGenerator::Range {
505 start: 0,
506 end: 10,
507 step: 1,
508 },
509 },
510 DsonOperation::StreamFilter {
511 path: "numbers".to_string(),
512 predicate: FilterPredicate::GreaterThan(5),
513 },
514 ];
515 processor.process_stream(&operations).unwrap();
516 assert_eq!(processor.total_items(), 4); }
518
519 #[test]
520 fn test_stream_filter_less_than() {
521 let mut processor = StreamingProcessor::new(10);
522 let operations = vec![
523 DsonOperation::StreamBuild {
524 path: "numbers".to_string(),
525 generator: StreamGenerator::Range {
526 start: 0,
527 end: 10,
528 step: 1,
529 },
530 },
531 DsonOperation::StreamFilter {
532 path: "numbers".to_string(),
533 predicate: FilterPredicate::LessThan(5),
534 },
535 ];
536 processor.process_stream(&operations).unwrap();
537 assert_eq!(processor.total_items(), 5); }
539
540 #[test]
541 fn test_stream_filter_equals() {
542 let mut processor = StreamingProcessor::new(10);
543 let operations = vec![
544 DsonOperation::StreamBuild {
545 path: "numbers".to_string(),
546 generator: StreamGenerator::Range {
547 start: 0,
548 end: 10,
549 step: 1,
550 },
551 },
552 DsonOperation::StreamFilter {
553 path: "numbers".to_string(),
554 predicate: FilterPredicate::Equals(OperationValue::NumberRef("5".to_string())),
555 },
556 ];
557 processor.process_stream(&operations).unwrap();
558 assert_eq!(processor.total_items(), 1);
559 }
560
561 #[test]
562 fn test_stream_filter_every_nth() {
563 let mut processor = StreamingProcessor::new(10);
564 let operations = vec![
565 DsonOperation::StreamBuild {
566 path: "numbers".to_string(),
567 generator: StreamGenerator::Range {
568 start: 0,
569 end: 12,
570 step: 1,
571 },
572 },
573 DsonOperation::StreamFilter {
574 path: "numbers".to_string(),
575 predicate: FilterPredicate::EveryNth(3),
576 },
577 ];
578 processor.process_stream(&operations).unwrap();
579 }
580
581 #[test]
582 fn test_stream_filter_alternate() {
583 let mut processor = StreamingProcessor::new(10);
584 let operations = vec![
585 DsonOperation::StreamBuild {
586 path: "numbers".to_string(),
587 generator: StreamGenerator::Range {
588 start: 0,
589 end: 10,
590 step: 1,
591 },
592 },
593 DsonOperation::StreamFilter {
594 path: "numbers".to_string(),
595 predicate: FilterPredicate::Alternate,
596 },
597 ];
598 processor.process_stream(&operations).unwrap();
599 }
600
601 #[test]
602 fn test_stream_filter_custom() {
603 let mut processor = StreamingProcessor::new(10);
604 let operations = vec![
605 DsonOperation::StreamBuild {
606 path: "numbers".to_string(),
607 generator: StreamGenerator::Range {
608 start: 0,
609 end: 20,
610 step: 1,
611 },
612 },
613 DsonOperation::StreamFilter {
614 path: "numbers".to_string(),
615 predicate: FilterPredicate::Custom("value > 10".to_string()),
616 },
617 ];
618 processor.process_stream(&operations).unwrap();
619 }
620
621 #[test]
622 fn test_stream_map_add() {
623 let mut processor = StreamingProcessor::new(10);
624 let operations = vec![
625 DsonOperation::StreamBuild {
626 path: "numbers".to_string(),
627 generator: StreamGenerator::Range {
628 start: 0,
629 end: 5,
630 step: 1,
631 },
632 },
633 DsonOperation::StreamMap {
634 path: "numbers".to_string(),
635 transform: TransformFunction::Add(10),
636 },
637 ];
638 processor.process_stream(&operations).unwrap();
639 }
640
641 #[test]
642 fn test_stream_map_to_lowercase() {
643 let mut processor = StreamingProcessor::new(10);
644 let operations = vec![
645 DsonOperation::StreamBuild {
646 path: "strings".to_string(),
647 generator: StreamGenerator::Repeat(
648 OperationValue::StringRef("HELLO".to_string()),
649 3,
650 ),
651 },
652 DsonOperation::StreamMap {
653 path: "strings".to_string(),
654 transform: TransformFunction::ToLowercase,
655 },
656 ];
657 processor.process_stream(&operations).unwrap();
658 }
659
660 #[test]
661 fn test_stream_map_to_uppercase() {
662 let mut processor = StreamingProcessor::new(10);
663 let operations = vec![
664 DsonOperation::StreamBuild {
665 path: "strings".to_string(),
666 generator: StreamGenerator::Repeat(
667 OperationValue::StringRef("hello".to_string()),
668 3,
669 ),
670 },
671 DsonOperation::StreamMap {
672 path: "strings".to_string(),
673 transform: TransformFunction::ToUppercase,
674 },
675 ];
676 processor.process_stream(&operations).unwrap();
677 }
678
679 #[test]
680 fn test_stream_map_append() {
681 let mut processor = StreamingProcessor::new(10);
682 let operations = vec![
683 DsonOperation::StreamBuild {
684 path: "strings".to_string(),
685 generator: StreamGenerator::Repeat(
686 OperationValue::StringRef("hello".to_string()),
687 2,
688 ),
689 },
690 DsonOperation::StreamMap {
691 path: "strings".to_string(),
692 transform: TransformFunction::Append("!".to_string()),
693 },
694 ];
695 processor.process_stream(&operations).unwrap();
696 }
697
698 #[test]
699 fn test_stream_map_prepend() {
700 let mut processor = StreamingProcessor::new(10);
701 let operations = vec![
702 DsonOperation::StreamBuild {
703 path: "strings".to_string(),
704 generator: StreamGenerator::Repeat(
705 OperationValue::StringRef("world".to_string()),
706 2,
707 ),
708 },
709 DsonOperation::StreamMap {
710 path: "strings".to_string(),
711 transform: TransformFunction::Prepend("hello ".to_string()),
712 },
713 ];
714 processor.process_stream(&operations).unwrap();
715 }
716
717 #[test]
718 fn test_stream_custom_generator() {
719 let mut processor = StreamingProcessor::new(10);
720 let operations = vec![DsonOperation::StreamBuild {
721 path: "custom".to_string(),
722 generator: StreamGenerator::Custom("test".to_string()),
723 }];
724 processor.process_stream(&operations).unwrap();
725 }
726
727 #[test]
728 fn test_processor_getter() {
729 let processor = StreamingProcessor::new(10);
730 let _ = processor.processor();
731 }
732
733 #[test]
734 fn test_batch_count_empty() {
735 let processor = StreamingProcessor::new(10);
736 assert_eq!(processor.batch_count(), 0);
737 }
738
739 #[test]
742 fn test_pass_through_operations() {
743 let mut processor = StreamingProcessor::new(10);
744 let operations = vec![DsonOperation::FieldAdd {
746 path: "test".to_string(),
747 value: OperationValue::StringRef("value".to_string()),
748 }];
749 processor.process_stream(&operations).unwrap();
750 }
751
752 #[test]
753 fn test_generate_repeat_with_flush() {
754 let mut processor = StreamingProcessor::new(3);
756 let operations = vec![DsonOperation::StreamBuild {
757 path: "rep".to_string(),
758 generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
759 }];
760 processor.process_stream(&operations).unwrap();
761 assert!(processor.batch_count() > 0);
763 }
764
765 #[test]
766 fn test_generate_fibonacci_with_flush() {
767 let mut processor = StreamingProcessor::new(3);
769 let operations = vec![DsonOperation::StreamBuild {
770 path: "fib".to_string(),
771 generator: StreamGenerator::Fibonacci(15),
772 }];
773 processor.process_stream(&operations).unwrap();
774 assert!(processor.batch_count() > 0);
776 }
777
778 #[test]
779 fn test_stream_emit_empty() {
780 let mut processor = StreamingProcessor::new(10);
781 let operations = vec![DsonOperation::StreamEmit {
783 path: "out".to_string(),
784 batch_size: 5,
785 }];
786 processor.process_stream(&operations).unwrap();
787 }
788
789 #[test]
790 fn test_stream_emit_with_processed_batches() {
791 let mut processor = StreamingProcessor::new(3);
792 let operations = vec![
794 DsonOperation::StreamBuild {
795 path: "numbers".to_string(),
796 generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
797 },
798 DsonOperation::StreamEmit {
799 path: "out".to_string(),
800 batch_size: 3,
801 },
802 ];
803 processor.process_stream(&operations).unwrap();
804 }
805
806 #[test]
807 fn test_custom_predicate_less_than() {
808 let mut processor = StreamingProcessor::new(10);
809 let operations = vec![
810 DsonOperation::StreamBuild {
811 path: "numbers".to_string(),
812 generator: StreamGenerator::Range {
813 start: 0,
814 end: 20,
815 step: 1,
816 },
817 },
818 DsonOperation::StreamFilter {
819 path: "numbers".to_string(),
820 predicate: FilterPredicate::Custom("value < 5".to_string()),
821 },
822 ];
823 processor.process_stream(&operations).unwrap();
824 assert_eq!(processor.total_items(), 5); }
826
827 #[test]
828 fn test_custom_predicate_equals() {
829 let mut processor = StreamingProcessor::new(10);
830 let operations = vec![
831 DsonOperation::StreamBuild {
832 path: "numbers".to_string(),
833 generator: StreamGenerator::Range {
834 start: 0,
835 end: 10,
836 step: 1,
837 },
838 },
839 DsonOperation::StreamFilter {
840 path: "numbers".to_string(),
841 predicate: FilterPredicate::Custom("value == 5".to_string()),
842 },
843 ];
844 processor.process_stream(&operations).unwrap();
845 assert_eq!(processor.total_items(), 1);
846 }
847
848 #[test]
849 fn test_custom_predicate_not_equals() {
850 let mut processor = StreamingProcessor::new(10);
851 let operations = vec![
852 DsonOperation::StreamBuild {
853 path: "numbers".to_string(),
854 generator: StreamGenerator::Range {
855 start: 0,
856 end: 10,
857 step: 1,
858 },
859 },
860 DsonOperation::StreamFilter {
861 path: "numbers".to_string(),
862 predicate: FilterPredicate::Custom("value != 5".to_string()),
863 },
864 ];
865 processor.process_stream(&operations).unwrap();
866 assert_eq!(processor.total_items(), 9); }
868
869 #[test]
870 fn test_custom_predicate_invalid() {
871 let mut processor = StreamingProcessor::new(10);
872 let operations = vec![
873 DsonOperation::StreamBuild {
874 path: "numbers".to_string(),
875 generator: StreamGenerator::Range {
876 start: 0,
877 end: 5,
878 step: 1,
879 },
880 },
881 DsonOperation::StreamFilter {
882 path: "numbers".to_string(),
883 predicate: FilterPredicate::Custom("invalid_predicate".to_string()),
884 },
885 ];
886 processor.process_stream(&operations).unwrap();
887 assert_eq!(processor.total_items(), 5);
889 }
890
891 #[test]
892 fn test_filter_equals_string() {
893 let mut processor = StreamingProcessor::new(10);
894 let operations = vec![
895 DsonOperation::StreamBuild {
896 path: "strings".to_string(),
897 generator: StreamGenerator::Repeat(
898 OperationValue::StringRef("hello".to_string()),
899 3,
900 ),
901 },
902 DsonOperation::StreamFilter {
903 path: "strings".to_string(),
904 predicate: FilterPredicate::Equals(OperationValue::StringRef("hello".to_string())),
905 },
906 ];
907 processor.process_stream(&operations).unwrap();
908 assert_eq!(processor.total_items(), 3);
909 }
910
911 #[test]
912 fn test_filter_equals_bool() {
913 let mut processor = StreamingProcessor::new(10);
914 processor.current_batch.push(OperationValue::BoolRef(true));
916 processor.current_batch.push(OperationValue::BoolRef(false));
917 processor.current_batch.push(OperationValue::BoolRef(true));
918
919 let operations = vec![DsonOperation::StreamFilter {
920 path: "bools".to_string(),
921 predicate: FilterPredicate::Equals(OperationValue::BoolRef(true)),
922 }];
923 processor.process_stream(&operations).unwrap();
924 assert_eq!(processor.total_items(), 2); }
926
927 #[test]
928 fn test_filter_equals_null() {
929 let mut processor = StreamingProcessor::new(10);
930 processor.current_batch.push(OperationValue::Null);
931 processor
932 .current_batch
933 .push(OperationValue::StringRef("not null".to_string()));
934 processor.current_batch.push(OperationValue::Null);
935
936 let operations = vec![DsonOperation::StreamFilter {
937 path: "nulls".to_string(),
938 predicate: FilterPredicate::Equals(OperationValue::Null),
939 }];
940 processor.process_stream(&operations).unwrap();
941 assert_eq!(processor.total_items(), 2); }
943
944 #[test]
945 fn test_filter_equals_type_mismatch() {
946 let mut processor = StreamingProcessor::new(10);
947 processor
948 .current_batch
949 .push(OperationValue::NumberRef("5".to_string()));
950
951 let operations = vec![DsonOperation::StreamFilter {
952 path: "test".to_string(),
953 predicate: FilterPredicate::Equals(OperationValue::StringRef("5".to_string())),
954 }];
955 processor.process_stream(&operations).unwrap();
956 assert_eq!(processor.total_items(), 0);
958 }
959
960 #[test]
961 fn test_filter_even_with_non_numbers() {
962 let mut processor = StreamingProcessor::new(10);
963 processor
964 .current_batch
965 .push(OperationValue::StringRef("not a number".to_string()));
966 processor
967 .current_batch
968 .push(OperationValue::NumberRef("4".to_string()));
969 processor.current_batch.push(OperationValue::BoolRef(true));
970
971 let operations = vec![DsonOperation::StreamFilter {
972 path: "test".to_string(),
973 predicate: FilterPredicate::Even,
974 }];
975 processor.process_stream(&operations).unwrap();
976 assert_eq!(processor.total_items(), 1); }
978
979 #[test]
980 fn test_filter_odd_with_non_numbers() {
981 let mut processor = StreamingProcessor::new(10);
982 processor
983 .current_batch
984 .push(OperationValue::StringRef("not a number".to_string()));
985 processor
986 .current_batch
987 .push(OperationValue::NumberRef("3".to_string()));
988
989 let operations = vec![DsonOperation::StreamFilter {
990 path: "test".to_string(),
991 predicate: FilterPredicate::Odd,
992 }];
993 processor.process_stream(&operations).unwrap();
994 assert_eq!(processor.total_items(), 1); }
996
997 #[test]
998 fn test_filter_greater_than_with_non_numbers() {
999 let mut processor = StreamingProcessor::new(10);
1000 processor
1001 .current_batch
1002 .push(OperationValue::StringRef("text".to_string()));
1003 processor
1004 .current_batch
1005 .push(OperationValue::NumberRef("10".to_string()));
1006
1007 let operations = vec![DsonOperation::StreamFilter {
1008 path: "test".to_string(),
1009 predicate: FilterPredicate::GreaterThan(5),
1010 }];
1011 processor.process_stream(&operations).unwrap();
1012 assert_eq!(processor.total_items(), 1); }
1014
1015 #[test]
1016 fn test_filter_less_than_with_non_numbers() {
1017 let mut processor = StreamingProcessor::new(10);
1018 processor.current_batch.push(OperationValue::BoolRef(false));
1019 processor
1020 .current_batch
1021 .push(OperationValue::NumberRef("3".to_string()));
1022
1023 let operations = vec![DsonOperation::StreamFilter {
1024 path: "test".to_string(),
1025 predicate: FilterPredicate::LessThan(5),
1026 }];
1027 processor.process_stream(&operations).unwrap();
1028 assert_eq!(processor.total_items(), 1); }
1030
1031 #[test]
1032 fn test_transform_invalid_number() {
1033 let mut processor = StreamingProcessor::new(10);
1034 processor
1035 .current_batch
1036 .push(OperationValue::NumberRef("not_a_number".to_string()));
1037
1038 let operations = vec![DsonOperation::StreamMap {
1039 path: "test".to_string(),
1040 transform: TransformFunction::Add(10),
1041 }];
1042 processor.process_stream(&operations).unwrap();
1043 assert_eq!(processor.total_items(), 1);
1045 }
1046
1047 #[test]
1048 fn test_transform_multiply_invalid_number() {
1049 let mut processor = StreamingProcessor::new(10);
1050 processor
1051 .current_batch
1052 .push(OperationValue::NumberRef("invalid".to_string()));
1053
1054 let operations = vec![DsonOperation::StreamMap {
1055 path: "test".to_string(),
1056 transform: TransformFunction::Multiply(2),
1057 }];
1058 processor.process_stream(&operations).unwrap();
1059 assert_eq!(processor.total_items(), 1);
1060 }
1061
1062 #[test]
1063 fn test_transform_unsupported_combination() {
1064 let mut processor = StreamingProcessor::new(10);
1065 processor
1067 .current_batch
1068 .push(OperationValue::NumberRef("5".to_string()));
1069
1070 let operations = vec![DsonOperation::StreamMap {
1071 path: "test".to_string(),
1072 transform: TransformFunction::ToUppercase,
1073 }];
1074 processor.process_stream(&operations).unwrap();
1075 assert_eq!(processor.total_items(), 1);
1077 }
1078
1079 #[test]
1080 fn test_filter_equals_float_comparison() {
1081 let mut processor = StreamingProcessor::new(10);
1082 processor
1083 .current_batch
1084 .push(OperationValue::NumberRef("3.14".to_string()));
1085 processor
1086 .current_batch
1087 .push(OperationValue::NumberRef("2.71".to_string()));
1088
1089 let operations = vec![DsonOperation::StreamFilter {
1090 path: "floats".to_string(),
1091 predicate: FilterPredicate::Equals(OperationValue::NumberRef("3.14".to_string())),
1092 }];
1093 processor.process_stream(&operations).unwrap();
1094 assert_eq!(processor.total_items(), 1);
1095 }
1096
1097 #[test]
1098 fn test_custom_predicate_string_equals() {
1099 let mut processor = StreamingProcessor::new(10);
1100 processor
1101 .current_batch
1102 .push(OperationValue::StringRef("hello".to_string()));
1103 processor
1104 .current_batch
1105 .push(OperationValue::StringRef("world".to_string()));
1106
1107 let operations = vec![DsonOperation::StreamFilter {
1108 path: "strings".to_string(),
1109 predicate: FilterPredicate::Custom("value == \"hello\"".to_string()),
1110 }];
1111 processor.process_stream(&operations).unwrap();
1112 assert_eq!(processor.total_items(), 1);
1113 }
1114
1115 #[test]
1116 fn test_custom_predicate_string_not_equals() {
1117 let mut processor = StreamingProcessor::new(10);
1118 processor
1119 .current_batch
1120 .push(OperationValue::StringRef("hello".to_string()));
1121 processor
1122 .current_batch
1123 .push(OperationValue::StringRef("world".to_string()));
1124
1125 let operations = vec![DsonOperation::StreamFilter {
1126 path: "strings".to_string(),
1127 predicate: FilterPredicate::Custom("value != \"hello\"".to_string()),
1128 }];
1129 processor.process_stream(&operations).unwrap();
1130 assert_eq!(processor.total_items(), 1);
1131 }
1132
1133 #[test]
1134 fn test_batch_count_after_generation() {
1135 let mut processor = StreamingProcessor::new(3);
1136 let operations = vec![DsonOperation::StreamBuild {
1137 path: "nums".to_string(),
1138 generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 9),
1139 }];
1140 processor.process_stream(&operations).unwrap();
1141 assert!(processor.batch_count() >= 3);
1142 }
1143
1144 #[test]
1145 fn test_total_items_mixed() {
1146 let mut processor = StreamingProcessor::new(5);
1147 processor.current_batch.push(OperationValue::Null);
1149 processor.current_batch.push(OperationValue::Null);
1150
1151 processor.flush_batch();
1153
1154 processor.current_batch.push(OperationValue::Null);
1156
1157 assert_eq!(processor.total_items(), 3);
1158 }
1159}