Skip to main content

fionn_stream/
streaming.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! Streaming data pipeline processing for large datasets
3//!
4//! This module provides streaming capabilities for processing large JSON datasets
5//! without loading everything into memory at once.
6
7use fionn_core::Result;
8use fionn_ops::processor::BlackBoxProcessor;
9use fionn_ops::{
10    DsonOperation, FilterPredicate, OperationValue, StreamGenerator, TransformFunction,
11};
12use std::collections::VecDeque;
13
14/// Streaming processor for large dataset processing
15pub struct StreamingProcessor {
16    processor: BlackBoxProcessor,
17    buffer_size: usize,
18    current_batch: Vec<OperationValue>,
19    processed_batches: VecDeque<Vec<OperationValue>>,
20}
21
22impl StreamingProcessor {
23    /// Create a new streaming processor
24    #[must_use]
25    pub fn new(buffer_size: usize) -> Self {
26        Self {
27            processor: BlackBoxProcessor::new(vec![], vec![]),
28            buffer_size,
29            current_batch: Vec::new(),
30            processed_batches: VecDeque::new(),
31        }
32    }
33
34    /// Process a streaming operation pipeline
35    ///
36    /// # Errors
37    /// Returns an error if processing fails
38    pub fn process_stream(&mut self, operations: &[DsonOperation]) -> Result<()> {
39        for operation in operations {
40            self.apply_streaming_operation(operation)?;
41        }
42        Ok(())
43    }
44
45    /// Apply a streaming operation
46    ///
47    /// # Errors
48    /// Returns an error if the operation fails
49    fn apply_streaming_operation(&mut self, operation: &DsonOperation) -> Result<()> {
50        match operation {
51            DsonOperation::StreamBuild { path, generator } => self.stream_build(path, generator),
52            DsonOperation::StreamFilter { path, predicate } => {
53                self.stream_filter(path, predicate);
54                Ok(())
55            }
56            DsonOperation::StreamMap { path, transform } => {
57                self.stream_map(path, transform);
58                Ok(())
59            }
60            DsonOperation::StreamEmit { path, batch_size } => self.stream_emit(path, *batch_size),
61            // Pass through other operations to the underlying processor
62            _ => self.processor.apply_operation(operation),
63        }
64    }
65
66    /// Stream build operation - generate data streams
67    fn stream_build(&mut self, path: &str, generator: &StreamGenerator) -> Result<()> {
68        match generator {
69            StreamGenerator::Range { start, end, step } => {
70                self.generate_range(*start, *end, *step);
71            }
72            StreamGenerator::Repeat(value, count) => {
73                self.generate_repeat(value, *count);
74            }
75            StreamGenerator::Fibonacci(count) => {
76                self.generate_fibonacci(*count);
77            }
78            StreamGenerator::Custom(_) => {
79                // Custom generators would be implemented here
80                // For now, just track the operation
81                self.processor
82                    .apply_operation(&DsonOperation::StreamBuild {
83                        path: path.to_string(),
84                        generator: generator.clone(),
85                    })?;
86            }
87        }
88        Ok(())
89    }
90
91    /// Generate a numeric range
92    fn generate_range(&mut self, start: i64, end: i64, step: i64) {
93        let mut current = start;
94        while current < end {
95            self.current_batch
96                .push(OperationValue::NumberRef(current.to_string()));
97            current += step;
98        }
99        // Don't flush - let streaming operations work on current_batch
100    }
101
102    /// Generate repeated values
103    fn generate_repeat(&mut self, value: &OperationValue, count: usize) {
104        for _ in 0..count {
105            self.current_batch.push(value.clone());
106            if self.current_batch.len() >= self.buffer_size {
107                self.flush_batch();
108            }
109        }
110        self.flush_batch();
111    }
112
113    /// Generate fibonacci sequence
114    fn generate_fibonacci(&mut self, count: usize) {
115        let mut a = 0i64;
116        let mut b = 1i64;
117
118        for _ in 0..count {
119            self.current_batch
120                .push(OperationValue::NumberRef(a.to_string()));
121            if self.current_batch.len() >= self.buffer_size {
122                self.flush_batch();
123            }
124
125            let temp = a;
126            a = b;
127            b += temp;
128        }
129        self.flush_batch();
130    }
131
132    /// Stream filter operation
133    fn stream_filter(&mut self, _path: &str, predicate: &FilterPredicate) {
134        // Process current batch with filtering
135        let mut filtered = Vec::new();
136
137        for (index, value) in self.current_batch.iter().enumerate() {
138            if Self::matches_predicate(value, predicate, index) {
139                filtered.push(value.clone());
140            }
141        }
142
143        self.current_batch = filtered;
144    }
145
146    /// Stream map operation
147    fn stream_map(&mut self, _path: &str, transform: &TransformFunction) {
148        // Process current batch with transformation
149        let mut transformed = Vec::new();
150
151        for value in &self.current_batch {
152            let new_value = Self::apply_transform(value, transform);
153            transformed.push(new_value);
154        }
155
156        self.current_batch = transformed;
157    }
158
159    /// Stream emit operation - output processed batches
160    fn stream_emit(&mut self, path: &str, _batch_size: usize) -> Result<()> {
161        // Ensure we have data to emit
162        if self.current_batch.is_empty() && self.processed_batches.is_empty() {
163            return Ok(());
164        }
165
166        // Flush current batch if needed
167        if !self.current_batch.is_empty() {
168            self.flush_batch();
169        }
170
171        // Emit batches
172        while let Some(batch) = self.processed_batches.pop_front() {
173            // In a real implementation, this would send the batch to output
174            // For now, we track it in the processor
175            let emit_path = format!("{}.batch_{}", path, self.processed_batches.len());
176            let batch_value = OperationValue::StringRef(format!("batch_size:{}", batch.len()));
177            self.processor.apply_operation(&DsonOperation::FieldAdd {
178                path: emit_path,
179                value: batch_value,
180            })?;
181        }
182
183        Ok(())
184    }
185
186    /// Check if a value matches a filter predicate
187    fn matches_predicate(
188        value: &OperationValue,
189        predicate: &FilterPredicate,
190        index: usize,
191    ) -> bool {
192        match predicate {
193            FilterPredicate::Even => {
194                if let OperationValue::NumberRef(num_str) = value {
195                    num_str.parse::<i64>().is_ok_and(|num| num % 2 == 0)
196                } else {
197                    false
198                }
199            }
200            FilterPredicate::Odd => {
201                if let OperationValue::NumberRef(num_str) = value {
202                    num_str.parse::<i64>().is_ok_and(|num| num % 2 == 1)
203                } else {
204                    false
205                }
206            }
207            FilterPredicate::EveryNth(n) => index.is_multiple_of(*n),
208            FilterPredicate::GreaterThan(threshold) => {
209                if let OperationValue::NumberRef(num_str) = value {
210                    num_str.parse::<i64>().is_ok_and(|num| num > *threshold)
211                } else {
212                    false
213                }
214            }
215            FilterPredicate::LessThan(threshold) => {
216                if let OperationValue::NumberRef(num_str) = value {
217                    num_str.parse::<i64>().is_ok_and(|num| num < *threshold)
218                } else {
219                    false
220                }
221            }
222            FilterPredicate::Equals(compare_value) => {
223                match (value, compare_value) {
224                    (OperationValue::NumberRef(num_str), OperationValue::NumberRef(cmp_str)) => {
225                        // Compare as numbers
226                        if let (Ok(a), Ok(b)) = (num_str.parse::<i64>(), cmp_str.parse::<i64>()) {
227                            a == b
228                        } else if let (Ok(a), Ok(b)) =
229                            (num_str.parse::<f64>(), cmp_str.parse::<f64>())
230                        {
231                            (a - b).abs() < f64::EPSILON
232                        } else {
233                            num_str == cmp_str
234                        }
235                    }
236                    (OperationValue::StringRef(s1), OperationValue::StringRef(s2)) => s1 == s2,
237                    (OperationValue::BoolRef(b1), OperationValue::BoolRef(b2)) => b1 == b2,
238                    (OperationValue::Null, OperationValue::Null) => true,
239                    _ => false,
240                }
241            }
242            FilterPredicate::Alternate => {
243                // Alternate - select every other element (even indices)
244                index.is_multiple_of(2)
245            }
246            FilterPredicate::Custom(predicate_fn) => {
247                // Custom predicate - execute the stored predicate string as a simple expression
248                // For now, support basic patterns like "value > N" or "value == N"
249                Self::evaluate_custom_predicate(predicate_fn, value)
250            }
251        }
252    }
253
254    /// Evaluate a custom predicate expression
255    fn evaluate_custom_predicate(predicate: &str, value: &OperationValue) -> bool {
256        let predicate = predicate.trim();
257
258        // Parse simple expressions like "value > 10", "value == 5", "value < 100"
259        if let Some(rest) = predicate.strip_prefix("value") {
260            let rest = rest.trim();
261
262            // Parse operator and threshold
263            if let Some(threshold_str) = rest.strip_prefix(">") {
264                if let Ok(threshold) = threshold_str.trim().parse::<i64>()
265                    && let OperationValue::NumberRef(num_str) = value
266                    && let Ok(num) = num_str.parse::<i64>()
267                {
268                    return num > threshold;
269                }
270            } else if let Some(threshold_str) = rest.strip_prefix("<") {
271                if let Ok(threshold) = threshold_str.trim().parse::<i64>()
272                    && let OperationValue::NumberRef(num_str) = value
273                    && let Ok(num) = num_str.parse::<i64>()
274                {
275                    return num < threshold;
276                }
277            } else if let Some(threshold_str) = rest.strip_prefix("==") {
278                let threshold_str = threshold_str.trim();
279                if let OperationValue::NumberRef(num_str) = value {
280                    return num_str == threshold_str;
281                } else if let OperationValue::StringRef(s) = value {
282                    return s == threshold_str.trim_matches('"');
283                }
284            } else if let Some(threshold_str) = rest.strip_prefix("!=") {
285                let threshold_str = threshold_str.trim();
286                if let OperationValue::NumberRef(num_str) = value {
287                    return num_str != threshold_str;
288                } else if let OperationValue::StringRef(s) = value {
289                    return s != threshold_str.trim_matches('"');
290                }
291            }
292        }
293
294        // Default to true if predicate couldn't be parsed
295        true
296    }
297
298    /// Apply a transformation to a value
299    fn apply_transform(value: &OperationValue, transform: &TransformFunction) -> OperationValue {
300        match (value, transform) {
301            (OperationValue::NumberRef(num_str), TransformFunction::Add(delta)) => {
302                num_str.parse::<i64>().map_or_else(
303                    |_| value.clone(),
304                    |num| OperationValue::NumberRef((num + delta).to_string()),
305                )
306            }
307            (OperationValue::NumberRef(num_str), TransformFunction::Multiply(factor)) => {
308                num_str.parse::<i64>().map_or_else(
309                    |_| value.clone(),
310                    |num| OperationValue::NumberRef((num * factor).to_string()),
311                )
312            }
313            (OperationValue::StringRef(text), TransformFunction::ToUppercase) => {
314                OperationValue::StringRef(text.to_uppercase())
315            }
316            (OperationValue::StringRef(text), TransformFunction::ToLowercase) => {
317                OperationValue::StringRef(text.to_lowercase())
318            }
319            (OperationValue::StringRef(text), TransformFunction::Append(suffix)) => {
320                OperationValue::StringRef(format!("{text}{suffix}"))
321            }
322            (OperationValue::StringRef(text), TransformFunction::Prepend(prefix)) => {
323                OperationValue::StringRef(format!("{prefix}{text}"))
324            }
325            _ => value.clone(), // Return unchanged for unsupported combinations
326        }
327    }
328
329    /// Flush current batch to processed batches
330    fn flush_batch(&mut self) {
331        if !self.current_batch.is_empty() {
332            let batch = std::mem::take(&mut self.current_batch);
333            self.processed_batches.push_back(batch);
334        }
335    }
336
337    /// Get the underlying processor for inspection
338    #[must_use]
339    pub const fn processor(&self) -> &BlackBoxProcessor {
340        &self.processor
341    }
342
343    /// Get processed batch count
344    #[must_use]
345    pub fn batch_count(&self) -> usize {
346        self.processed_batches.len()
347    }
348
349    /// Get total items processed
350    #[must_use]
351    pub fn total_items(&self) -> usize {
352        self.processed_batches.iter().map(Vec::len).sum::<usize>() + self.current_batch.len()
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use fionn_ops::FilterPredicate;
360
361    #[test]
362    fn test_stream_range_generation() {
363        let mut processor = StreamingProcessor::new(10);
364
365        let operations = vec![DsonOperation::StreamBuild {
366            path: "numbers".to_string(),
367            generator: StreamGenerator::Range {
368                start: 0,
369                end: 25,
370                step: 5,
371            },
372        }];
373
374        processor.process_stream(&operations).unwrap();
375
376        // Should have generated: 0, 5, 10, 15, 20
377        assert_eq!(processor.total_items(), 5);
378        // Note: No emit operation, so items remain in current batch
379    }
380
381    #[test]
382    fn test_stream_filter_even() {
383        let mut processor = StreamingProcessor::new(10);
384
385        let operations = vec![
386            DsonOperation::StreamBuild {
387                path: "numbers".to_string(),
388                generator: StreamGenerator::Range {
389                    start: 0,
390                    end: 10,
391                    step: 1,
392                },
393            },
394            DsonOperation::StreamFilter {
395                path: "numbers".to_string(),
396                predicate: FilterPredicate::Even,
397            },
398        ];
399
400        processor.process_stream(&operations).unwrap();
401
402        // Should have: 0, 2, 4, 6, 8 (5 even numbers)
403        assert_eq!(processor.total_items(), 5);
404    }
405
406    #[test]
407    fn test_stream_map_multiply() {
408        let mut processor = StreamingProcessor::new(10);
409
410        let operations = vec![
411            DsonOperation::StreamBuild {
412                path: "numbers".to_string(),
413                generator: StreamGenerator::Range {
414                    start: 1,
415                    end: 4,
416                    step: 1,
417                },
418            },
419            DsonOperation::StreamMap {
420                path: "numbers".to_string(),
421                transform: TransformFunction::Multiply(2),
422            },
423        ];
424
425        processor.process_stream(&operations).unwrap();
426
427        // Should have: 2, 4, 6 (1*2, 2*2, 3*2)
428        assert_eq!(processor.total_items(), 3);
429    }
430
431    #[test]
432    fn test_stream_fibonacci() {
433        let mut processor = StreamingProcessor::new(10);
434
435        let operations = vec![DsonOperation::StreamBuild {
436            path: "fib".to_string(),
437            generator: StreamGenerator::Fibonacci(8),
438        }];
439
440        processor.process_stream(&operations).unwrap();
441
442        // Should have: 0, 1, 1, 2, 3, 5, 8, 13
443        assert_eq!(processor.total_items(), 8);
444    }
445
446    #[test]
447    fn test_stream_repeat() {
448        let mut processor = StreamingProcessor::new(10);
449        let operations = vec![DsonOperation::StreamBuild {
450            path: "rep".to_string(),
451            generator: StreamGenerator::Repeat(OperationValue::StringRef("x".to_string()), 5),
452        }];
453        processor.process_stream(&operations).unwrap();
454        assert_eq!(processor.total_items(), 5);
455    }
456
457    #[test]
458    fn test_stream_emit() {
459        let mut processor = StreamingProcessor::new(5);
460        let operations = vec![
461            DsonOperation::StreamBuild {
462                path: "numbers".to_string(),
463                generator: StreamGenerator::Range {
464                    start: 0,
465                    end: 10,
466                    step: 1,
467                },
468            },
469            DsonOperation::StreamEmit {
470                path: "out".to_string(),
471                batch_size: 5,
472            },
473        ];
474        processor.process_stream(&operations).unwrap();
475    }
476
477    #[test]
478    fn test_stream_filter_odd() {
479        let mut processor = StreamingProcessor::new(10);
480        let operations = vec![
481            DsonOperation::StreamBuild {
482                path: "numbers".to_string(),
483                generator: StreamGenerator::Range {
484                    start: 0,
485                    end: 10,
486                    step: 1,
487                },
488            },
489            DsonOperation::StreamFilter {
490                path: "numbers".to_string(),
491                predicate: FilterPredicate::Odd,
492            },
493        ];
494        processor.process_stream(&operations).unwrap();
495        assert_eq!(processor.total_items(), 5);
496    }
497
498    #[test]
499    fn test_stream_filter_greater_than() {
500        let mut processor = StreamingProcessor::new(10);
501        let operations = vec![
502            DsonOperation::StreamBuild {
503                path: "numbers".to_string(),
504                generator: StreamGenerator::Range {
505                    start: 0,
506                    end: 10,
507                    step: 1,
508                },
509            },
510            DsonOperation::StreamFilter {
511                path: "numbers".to_string(),
512                predicate: FilterPredicate::GreaterThan(5),
513            },
514        ];
515        processor.process_stream(&operations).unwrap();
516        assert_eq!(processor.total_items(), 4); // 6, 7, 8, 9
517    }
518
519    #[test]
520    fn test_stream_filter_less_than() {
521        let mut processor = StreamingProcessor::new(10);
522        let operations = vec![
523            DsonOperation::StreamBuild {
524                path: "numbers".to_string(),
525                generator: StreamGenerator::Range {
526                    start: 0,
527                    end: 10,
528                    step: 1,
529                },
530            },
531            DsonOperation::StreamFilter {
532                path: "numbers".to_string(),
533                predicate: FilterPredicate::LessThan(5),
534            },
535        ];
536        processor.process_stream(&operations).unwrap();
537        assert_eq!(processor.total_items(), 5); // 0, 1, 2, 3, 4
538    }
539
540    #[test]
541    fn test_stream_filter_equals() {
542        let mut processor = StreamingProcessor::new(10);
543        let operations = vec![
544            DsonOperation::StreamBuild {
545                path: "numbers".to_string(),
546                generator: StreamGenerator::Range {
547                    start: 0,
548                    end: 10,
549                    step: 1,
550                },
551            },
552            DsonOperation::StreamFilter {
553                path: "numbers".to_string(),
554                predicate: FilterPredicate::Equals(OperationValue::NumberRef("5".to_string())),
555            },
556        ];
557        processor.process_stream(&operations).unwrap();
558        assert_eq!(processor.total_items(), 1);
559    }
560
561    #[test]
562    fn test_stream_filter_every_nth() {
563        let mut processor = StreamingProcessor::new(10);
564        let operations = vec![
565            DsonOperation::StreamBuild {
566                path: "numbers".to_string(),
567                generator: StreamGenerator::Range {
568                    start: 0,
569                    end: 12,
570                    step: 1,
571                },
572            },
573            DsonOperation::StreamFilter {
574                path: "numbers".to_string(),
575                predicate: FilterPredicate::EveryNth(3),
576            },
577        ];
578        processor.process_stream(&operations).unwrap();
579    }
580
581    #[test]
582    fn test_stream_filter_alternate() {
583        let mut processor = StreamingProcessor::new(10);
584        let operations = vec![
585            DsonOperation::StreamBuild {
586                path: "numbers".to_string(),
587                generator: StreamGenerator::Range {
588                    start: 0,
589                    end: 10,
590                    step: 1,
591                },
592            },
593            DsonOperation::StreamFilter {
594                path: "numbers".to_string(),
595                predicate: FilterPredicate::Alternate,
596            },
597        ];
598        processor.process_stream(&operations).unwrap();
599    }
600
601    #[test]
602    fn test_stream_filter_custom() {
603        let mut processor = StreamingProcessor::new(10);
604        let operations = vec![
605            DsonOperation::StreamBuild {
606                path: "numbers".to_string(),
607                generator: StreamGenerator::Range {
608                    start: 0,
609                    end: 20,
610                    step: 1,
611                },
612            },
613            DsonOperation::StreamFilter {
614                path: "numbers".to_string(),
615                predicate: FilterPredicate::Custom("value > 10".to_string()),
616            },
617        ];
618        processor.process_stream(&operations).unwrap();
619    }
620
621    #[test]
622    fn test_stream_map_add() {
623        let mut processor = StreamingProcessor::new(10);
624        let operations = vec![
625            DsonOperation::StreamBuild {
626                path: "numbers".to_string(),
627                generator: StreamGenerator::Range {
628                    start: 0,
629                    end: 5,
630                    step: 1,
631                },
632            },
633            DsonOperation::StreamMap {
634                path: "numbers".to_string(),
635                transform: TransformFunction::Add(10),
636            },
637        ];
638        processor.process_stream(&operations).unwrap();
639    }
640
641    #[test]
642    fn test_stream_map_to_lowercase() {
643        let mut processor = StreamingProcessor::new(10);
644        let operations = vec![
645            DsonOperation::StreamBuild {
646                path: "strings".to_string(),
647                generator: StreamGenerator::Repeat(
648                    OperationValue::StringRef("HELLO".to_string()),
649                    3,
650                ),
651            },
652            DsonOperation::StreamMap {
653                path: "strings".to_string(),
654                transform: TransformFunction::ToLowercase,
655            },
656        ];
657        processor.process_stream(&operations).unwrap();
658    }
659
660    #[test]
661    fn test_stream_map_to_uppercase() {
662        let mut processor = StreamingProcessor::new(10);
663        let operations = vec![
664            DsonOperation::StreamBuild {
665                path: "strings".to_string(),
666                generator: StreamGenerator::Repeat(
667                    OperationValue::StringRef("hello".to_string()),
668                    3,
669                ),
670            },
671            DsonOperation::StreamMap {
672                path: "strings".to_string(),
673                transform: TransformFunction::ToUppercase,
674            },
675        ];
676        processor.process_stream(&operations).unwrap();
677    }
678
679    #[test]
680    fn test_stream_map_append() {
681        let mut processor = StreamingProcessor::new(10);
682        let operations = vec![
683            DsonOperation::StreamBuild {
684                path: "strings".to_string(),
685                generator: StreamGenerator::Repeat(
686                    OperationValue::StringRef("hello".to_string()),
687                    2,
688                ),
689            },
690            DsonOperation::StreamMap {
691                path: "strings".to_string(),
692                transform: TransformFunction::Append("!".to_string()),
693            },
694        ];
695        processor.process_stream(&operations).unwrap();
696    }
697
698    #[test]
699    fn test_stream_map_prepend() {
700        let mut processor = StreamingProcessor::new(10);
701        let operations = vec![
702            DsonOperation::StreamBuild {
703                path: "strings".to_string(),
704                generator: StreamGenerator::Repeat(
705                    OperationValue::StringRef("world".to_string()),
706                    2,
707                ),
708            },
709            DsonOperation::StreamMap {
710                path: "strings".to_string(),
711                transform: TransformFunction::Prepend("hello ".to_string()),
712            },
713        ];
714        processor.process_stream(&operations).unwrap();
715    }
716
717    #[test]
718    fn test_stream_custom_generator() {
719        let mut processor = StreamingProcessor::new(10);
720        let operations = vec![DsonOperation::StreamBuild {
721            path: "custom".to_string(),
722            generator: StreamGenerator::Custom("test".to_string()),
723        }];
724        processor.process_stream(&operations).unwrap();
725    }
726
727    #[test]
728    fn test_processor_getter() {
729        let processor = StreamingProcessor::new(10);
730        let _ = processor.processor();
731    }
732
733    #[test]
734    fn test_batch_count_empty() {
735        let processor = StreamingProcessor::new(10);
736        assert_eq!(processor.batch_count(), 0);
737    }
738
739    // Additional tests for coverage
740
741    #[test]
742    fn test_pass_through_operations() {
743        let mut processor = StreamingProcessor::new(10);
744        // Test operations that pass through to underlying processor
745        let operations = vec![DsonOperation::FieldAdd {
746            path: "test".to_string(),
747            value: OperationValue::StringRef("value".to_string()),
748        }];
749        processor.process_stream(&operations).unwrap();
750    }
751
752    #[test]
753    fn test_generate_repeat_with_flush() {
754        // Small buffer to trigger flush during generation
755        let mut processor = StreamingProcessor::new(3);
756        let operations = vec![DsonOperation::StreamBuild {
757            path: "rep".to_string(),
758            generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
759        }];
760        processor.process_stream(&operations).unwrap();
761        // Should have flushed multiple times
762        assert!(processor.batch_count() > 0);
763    }
764
765    #[test]
766    fn test_generate_fibonacci_with_flush() {
767        // Small buffer to trigger flush during fibonacci generation
768        let mut processor = StreamingProcessor::new(3);
769        let operations = vec![DsonOperation::StreamBuild {
770            path: "fib".to_string(),
771            generator: StreamGenerator::Fibonacci(15),
772        }];
773        processor.process_stream(&operations).unwrap();
774        // Should have flushed multiple times
775        assert!(processor.batch_count() > 0);
776    }
777
778    #[test]
779    fn test_stream_emit_empty() {
780        let mut processor = StreamingProcessor::new(10);
781        // Emit with no data should be ok
782        let operations = vec![DsonOperation::StreamEmit {
783            path: "out".to_string(),
784            batch_size: 5,
785        }];
786        processor.process_stream(&operations).unwrap();
787    }
788
789    #[test]
790    fn test_stream_emit_with_processed_batches() {
791        let mut processor = StreamingProcessor::new(3);
792        // Generate enough to create processed batches
793        let operations = vec![
794            DsonOperation::StreamBuild {
795                path: "numbers".to_string(),
796                generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
797            },
798            DsonOperation::StreamEmit {
799                path: "out".to_string(),
800                batch_size: 3,
801            },
802        ];
803        processor.process_stream(&operations).unwrap();
804    }
805
806    #[test]
807    fn test_custom_predicate_less_than() {
808        let mut processor = StreamingProcessor::new(10);
809        let operations = vec![
810            DsonOperation::StreamBuild {
811                path: "numbers".to_string(),
812                generator: StreamGenerator::Range {
813                    start: 0,
814                    end: 20,
815                    step: 1,
816                },
817            },
818            DsonOperation::StreamFilter {
819                path: "numbers".to_string(),
820                predicate: FilterPredicate::Custom("value < 5".to_string()),
821            },
822        ];
823        processor.process_stream(&operations).unwrap();
824        assert_eq!(processor.total_items(), 5); // 0, 1, 2, 3, 4
825    }
826
827    #[test]
828    fn test_custom_predicate_equals() {
829        let mut processor = StreamingProcessor::new(10);
830        let operations = vec![
831            DsonOperation::StreamBuild {
832                path: "numbers".to_string(),
833                generator: StreamGenerator::Range {
834                    start: 0,
835                    end: 10,
836                    step: 1,
837                },
838            },
839            DsonOperation::StreamFilter {
840                path: "numbers".to_string(),
841                predicate: FilterPredicate::Custom("value == 5".to_string()),
842            },
843        ];
844        processor.process_stream(&operations).unwrap();
845        assert_eq!(processor.total_items(), 1);
846    }
847
848    #[test]
849    fn test_custom_predicate_not_equals() {
850        let mut processor = StreamingProcessor::new(10);
851        let operations = vec![
852            DsonOperation::StreamBuild {
853                path: "numbers".to_string(),
854                generator: StreamGenerator::Range {
855                    start: 0,
856                    end: 10,
857                    step: 1,
858                },
859            },
860            DsonOperation::StreamFilter {
861                path: "numbers".to_string(),
862                predicate: FilterPredicate::Custom("value != 5".to_string()),
863            },
864        ];
865        processor.process_stream(&operations).unwrap();
866        assert_eq!(processor.total_items(), 9); // all except 5
867    }
868
869    #[test]
870    fn test_custom_predicate_invalid() {
871        let mut processor = StreamingProcessor::new(10);
872        let operations = vec![
873            DsonOperation::StreamBuild {
874                path: "numbers".to_string(),
875                generator: StreamGenerator::Range {
876                    start: 0,
877                    end: 5,
878                    step: 1,
879                },
880            },
881            DsonOperation::StreamFilter {
882                path: "numbers".to_string(),
883                predicate: FilterPredicate::Custom("invalid_predicate".to_string()),
884            },
885        ];
886        processor.process_stream(&operations).unwrap();
887        // Invalid predicate defaults to true, so all items pass
888        assert_eq!(processor.total_items(), 5);
889    }
890
891    #[test]
892    fn test_filter_equals_string() {
893        let mut processor = StreamingProcessor::new(10);
894        let operations = vec![
895            DsonOperation::StreamBuild {
896                path: "strings".to_string(),
897                generator: StreamGenerator::Repeat(
898                    OperationValue::StringRef("hello".to_string()),
899                    3,
900                ),
901            },
902            DsonOperation::StreamFilter {
903                path: "strings".to_string(),
904                predicate: FilterPredicate::Equals(OperationValue::StringRef("hello".to_string())),
905            },
906        ];
907        processor.process_stream(&operations).unwrap();
908        assert_eq!(processor.total_items(), 3);
909    }
910
911    #[test]
912    fn test_filter_equals_bool() {
913        let mut processor = StreamingProcessor::new(10);
914        // Add bools to current batch manually
915        processor.current_batch.push(OperationValue::BoolRef(true));
916        processor.current_batch.push(OperationValue::BoolRef(false));
917        processor.current_batch.push(OperationValue::BoolRef(true));
918
919        let operations = vec![DsonOperation::StreamFilter {
920            path: "bools".to_string(),
921            predicate: FilterPredicate::Equals(OperationValue::BoolRef(true)),
922        }];
923        processor.process_stream(&operations).unwrap();
924        assert_eq!(processor.total_items(), 2); // Two trues
925    }
926
927    #[test]
928    fn test_filter_equals_null() {
929        let mut processor = StreamingProcessor::new(10);
930        processor.current_batch.push(OperationValue::Null);
931        processor
932            .current_batch
933            .push(OperationValue::StringRef("not null".to_string()));
934        processor.current_batch.push(OperationValue::Null);
935
936        let operations = vec![DsonOperation::StreamFilter {
937            path: "nulls".to_string(),
938            predicate: FilterPredicate::Equals(OperationValue::Null),
939        }];
940        processor.process_stream(&operations).unwrap();
941        assert_eq!(processor.total_items(), 2); // Two nulls
942    }
943
944    #[test]
945    fn test_filter_equals_type_mismatch() {
946        let mut processor = StreamingProcessor::new(10);
947        processor
948            .current_batch
949            .push(OperationValue::NumberRef("5".to_string()));
950
951        let operations = vec![DsonOperation::StreamFilter {
952            path: "test".to_string(),
953            predicate: FilterPredicate::Equals(OperationValue::StringRef("5".to_string())),
954        }];
955        processor.process_stream(&operations).unwrap();
956        // Type mismatch returns false
957        assert_eq!(processor.total_items(), 0);
958    }
959
960    #[test]
961    fn test_filter_even_with_non_numbers() {
962        let mut processor = StreamingProcessor::new(10);
963        processor
964            .current_batch
965            .push(OperationValue::StringRef("not a number".to_string()));
966        processor
967            .current_batch
968            .push(OperationValue::NumberRef("4".to_string()));
969        processor.current_batch.push(OperationValue::BoolRef(true));
970
971        let operations = vec![DsonOperation::StreamFilter {
972            path: "test".to_string(),
973            predicate: FilterPredicate::Even,
974        }];
975        processor.process_stream(&operations).unwrap();
976        assert_eq!(processor.total_items(), 1); // Only 4 passes
977    }
978
979    #[test]
980    fn test_filter_odd_with_non_numbers() {
981        let mut processor = StreamingProcessor::new(10);
982        processor
983            .current_batch
984            .push(OperationValue::StringRef("not a number".to_string()));
985        processor
986            .current_batch
987            .push(OperationValue::NumberRef("3".to_string()));
988
989        let operations = vec![DsonOperation::StreamFilter {
990            path: "test".to_string(),
991            predicate: FilterPredicate::Odd,
992        }];
993        processor.process_stream(&operations).unwrap();
994        assert_eq!(processor.total_items(), 1); // Only 3 passes
995    }
996
997    #[test]
998    fn test_filter_greater_than_with_non_numbers() {
999        let mut processor = StreamingProcessor::new(10);
1000        processor
1001            .current_batch
1002            .push(OperationValue::StringRef("text".to_string()));
1003        processor
1004            .current_batch
1005            .push(OperationValue::NumberRef("10".to_string()));
1006
1007        let operations = vec![DsonOperation::StreamFilter {
1008            path: "test".to_string(),
1009            predicate: FilterPredicate::GreaterThan(5),
1010        }];
1011        processor.process_stream(&operations).unwrap();
1012        assert_eq!(processor.total_items(), 1); // Only 10 > 5
1013    }
1014
1015    #[test]
1016    fn test_filter_less_than_with_non_numbers() {
1017        let mut processor = StreamingProcessor::new(10);
1018        processor.current_batch.push(OperationValue::BoolRef(false));
1019        processor
1020            .current_batch
1021            .push(OperationValue::NumberRef("3".to_string()));
1022
1023        let operations = vec![DsonOperation::StreamFilter {
1024            path: "test".to_string(),
1025            predicate: FilterPredicate::LessThan(5),
1026        }];
1027        processor.process_stream(&operations).unwrap();
1028        assert_eq!(processor.total_items(), 1); // Only 3 < 5
1029    }
1030
1031    #[test]
1032    fn test_transform_invalid_number() {
1033        let mut processor = StreamingProcessor::new(10);
1034        processor
1035            .current_batch
1036            .push(OperationValue::NumberRef("not_a_number".to_string()));
1037
1038        let operations = vec![DsonOperation::StreamMap {
1039            path: "test".to_string(),
1040            transform: TransformFunction::Add(10),
1041        }];
1042        processor.process_stream(&operations).unwrap();
1043        // Invalid number should be returned unchanged
1044        assert_eq!(processor.total_items(), 1);
1045    }
1046
1047    #[test]
1048    fn test_transform_multiply_invalid_number() {
1049        let mut processor = StreamingProcessor::new(10);
1050        processor
1051            .current_batch
1052            .push(OperationValue::NumberRef("invalid".to_string()));
1053
1054        let operations = vec![DsonOperation::StreamMap {
1055            path: "test".to_string(),
1056            transform: TransformFunction::Multiply(2),
1057        }];
1058        processor.process_stream(&operations).unwrap();
1059        assert_eq!(processor.total_items(), 1);
1060    }
1061
1062    #[test]
1063    fn test_transform_unsupported_combination() {
1064        let mut processor = StreamingProcessor::new(10);
1065        // Try to uppercase a number (unsupported)
1066        processor
1067            .current_batch
1068            .push(OperationValue::NumberRef("5".to_string()));
1069
1070        let operations = vec![DsonOperation::StreamMap {
1071            path: "test".to_string(),
1072            transform: TransformFunction::ToUppercase,
1073        }];
1074        processor.process_stream(&operations).unwrap();
1075        // Should return unchanged
1076        assert_eq!(processor.total_items(), 1);
1077    }
1078
1079    #[test]
1080    fn test_filter_equals_float_comparison() {
1081        let mut processor = StreamingProcessor::new(10);
1082        processor
1083            .current_batch
1084            .push(OperationValue::NumberRef("3.14".to_string()));
1085        processor
1086            .current_batch
1087            .push(OperationValue::NumberRef("2.71".to_string()));
1088
1089        let operations = vec![DsonOperation::StreamFilter {
1090            path: "floats".to_string(),
1091            predicate: FilterPredicate::Equals(OperationValue::NumberRef("3.14".to_string())),
1092        }];
1093        processor.process_stream(&operations).unwrap();
1094        assert_eq!(processor.total_items(), 1);
1095    }
1096
1097    #[test]
1098    fn test_custom_predicate_string_equals() {
1099        let mut processor = StreamingProcessor::new(10);
1100        processor
1101            .current_batch
1102            .push(OperationValue::StringRef("hello".to_string()));
1103        processor
1104            .current_batch
1105            .push(OperationValue::StringRef("world".to_string()));
1106
1107        let operations = vec![DsonOperation::StreamFilter {
1108            path: "strings".to_string(),
1109            predicate: FilterPredicate::Custom("value == \"hello\"".to_string()),
1110        }];
1111        processor.process_stream(&operations).unwrap();
1112        assert_eq!(processor.total_items(), 1);
1113    }
1114
1115    #[test]
1116    fn test_custom_predicate_string_not_equals() {
1117        let mut processor = StreamingProcessor::new(10);
1118        processor
1119            .current_batch
1120            .push(OperationValue::StringRef("hello".to_string()));
1121        processor
1122            .current_batch
1123            .push(OperationValue::StringRef("world".to_string()));
1124
1125        let operations = vec![DsonOperation::StreamFilter {
1126            path: "strings".to_string(),
1127            predicate: FilterPredicate::Custom("value != \"hello\"".to_string()),
1128        }];
1129        processor.process_stream(&operations).unwrap();
1130        assert_eq!(processor.total_items(), 1);
1131    }
1132
1133    #[test]
1134    fn test_batch_count_after_generation() {
1135        let mut processor = StreamingProcessor::new(3);
1136        let operations = vec![DsonOperation::StreamBuild {
1137            path: "nums".to_string(),
1138            generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 9),
1139        }];
1140        processor.process_stream(&operations).unwrap();
1141        assert!(processor.batch_count() >= 3);
1142    }
1143
1144    #[test]
1145    fn test_total_items_mixed() {
1146        let mut processor = StreamingProcessor::new(5);
1147        // Add some to current batch
1148        processor.current_batch.push(OperationValue::Null);
1149        processor.current_batch.push(OperationValue::Null);
1150
1151        // Flush to create processed batch
1152        processor.flush_batch();
1153
1154        // Add more to current batch
1155        processor.current_batch.push(OperationValue::Null);
1156
1157        assert_eq!(processor.total_items(), 3);
1158    }
1159}