Skip to main content

fionn_ops/processor/
streaming.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! Streaming data pipeline processing for large datasets
3//!
4//! This module provides streaming capabilities for processing large JSON datasets
5//! without loading everything into memory at once.
6
7use super::BlackBoxProcessor;
8use crate::{DsonOperation, FilterPredicate, OperationValue, StreamGenerator, TransformFunction};
9use fionn_core::Result;
10use std::collections::VecDeque;
11
12/// Streaming processor for large dataset processing
13pub struct StreamingProcessor {
14    processor: BlackBoxProcessor,
15    buffer_size: usize,
16    current_batch: Vec<OperationValue>,
17    processed_batches: VecDeque<Vec<OperationValue>>,
18}
19
20impl StreamingProcessor {
21    /// Create a new streaming processor
22    #[must_use]
23    pub fn new(buffer_size: usize) -> Self {
24        Self {
25            processor: BlackBoxProcessor::new(vec![], vec![]),
26            buffer_size,
27            current_batch: Vec::new(),
28            processed_batches: VecDeque::new(),
29        }
30    }
31
32    /// Process a streaming operation pipeline
33    ///
34    /// # Errors
35    /// Returns an error if processing fails
36    pub fn process_stream(&mut self, operations: &[DsonOperation]) -> Result<()> {
37        for operation in operations {
38            self.apply_streaming_operation(operation)?;
39        }
40        Ok(())
41    }
42
43    /// Apply a streaming operation
44    ///
45    /// # Errors
46    /// Returns an error if the operation fails
47    fn apply_streaming_operation(&mut self, operation: &DsonOperation) -> Result<()> {
48        match operation {
49            DsonOperation::StreamBuild { path, generator } => self.stream_build(path, generator),
50            DsonOperation::StreamFilter { path, predicate } => {
51                self.stream_filter(path, predicate);
52                Ok(())
53            }
54            DsonOperation::StreamMap { path, transform } => {
55                self.stream_map(path, transform);
56                Ok(())
57            }
58            DsonOperation::StreamEmit { path, batch_size } => self.stream_emit(path, *batch_size),
59            // Pass through other operations to the underlying processor
60            _ => self.processor.apply_operation(operation),
61        }
62    }
63
64    /// Stream build operation - generate data streams
65    fn stream_build(&mut self, path: &str, generator: &StreamGenerator) -> Result<()> {
66        match generator {
67            StreamGenerator::Range { start, end, step } => {
68                self.generate_range(*start, *end, *step);
69            }
70            StreamGenerator::Repeat(value, count) => {
71                self.generate_repeat(value, *count);
72            }
73            StreamGenerator::Fibonacci(count) => {
74                self.generate_fibonacci(*count);
75            }
76            StreamGenerator::Custom(_) => {
77                // Custom generators would be implemented here
78                // For now, just track the operation
79                self.processor
80                    .apply_operation(&DsonOperation::StreamBuild {
81                        path: path.to_string(),
82                        generator: generator.clone(),
83                    })?;
84            }
85        }
86        Ok(())
87    }
88
89    /// Generate a numeric range
90    fn generate_range(&mut self, start: i64, end: i64, step: i64) {
91        let mut current = start;
92        while current < end {
93            self.current_batch
94                .push(OperationValue::NumberRef(current.to_string()));
95            current += step;
96        }
97        // Don't flush - let streaming operations work on current_batch
98    }
99
100    /// Generate repeated values
101    fn generate_repeat(&mut self, value: &OperationValue, count: usize) {
102        for _ in 0..count {
103            self.current_batch.push(value.clone());
104            if self.current_batch.len() >= self.buffer_size {
105                self.flush_batch();
106            }
107        }
108        self.flush_batch();
109    }
110
111    /// Generate fibonacci sequence
112    fn generate_fibonacci(&mut self, count: usize) {
113        let mut a = 0i64;
114        let mut b = 1i64;
115
116        for _ in 0..count {
117            self.current_batch
118                .push(OperationValue::NumberRef(a.to_string()));
119            if self.current_batch.len() >= self.buffer_size {
120                self.flush_batch();
121            }
122
123            let temp = a;
124            a = b;
125            b += temp;
126        }
127        self.flush_batch();
128    }
129
130    /// Stream filter operation
131    fn stream_filter(&mut self, _path: &str, predicate: &FilterPredicate) {
132        // Process current batch with filtering
133        let mut filtered = Vec::new();
134
135        for (index, value) in self.current_batch.iter().enumerate() {
136            if Self::matches_predicate(value, predicate, index) {
137                filtered.push(value.clone());
138            }
139        }
140
141        self.current_batch = filtered;
142    }
143
144    /// Stream map operation
145    fn stream_map(&mut self, _path: &str, transform: &TransformFunction) {
146        // Process current batch with transformation
147        let mut transformed = Vec::new();
148
149        for value in &self.current_batch {
150            let new_value = Self::apply_transform(value, transform);
151            transformed.push(new_value);
152        }
153
154        self.current_batch = transformed;
155    }
156
157    /// Stream emit operation - output processed batches
158    fn stream_emit(&mut self, path: &str, _batch_size: usize) -> Result<()> {
159        // Ensure we have data to emit
160        if self.current_batch.is_empty() && self.processed_batches.is_empty() {
161            return Ok(());
162        }
163
164        // Flush current batch if needed
165        if !self.current_batch.is_empty() {
166            self.flush_batch();
167        }
168
169        // Emit batches
170        while let Some(batch) = self.processed_batches.pop_front() {
171            // In a real implementation, this would send the batch to output
172            // For now, we track it in the processor
173            let emit_path = format!("{}.batch_{}", path, self.processed_batches.len());
174            let batch_value = OperationValue::StringRef(format!("batch_size:{}", batch.len()));
175            self.processor.apply_operation(&DsonOperation::FieldAdd {
176                path: emit_path,
177                value: batch_value,
178            })?;
179        }
180
181        Ok(())
182    }
183
184    /// Check if a value matches a filter predicate
185    fn matches_predicate(
186        value: &OperationValue,
187        predicate: &FilterPredicate,
188        index: usize,
189    ) -> bool {
190        match predicate {
191            FilterPredicate::Even => {
192                if let OperationValue::NumberRef(num_str) = value {
193                    num_str.parse::<i64>().is_ok_and(|num| num % 2 == 0)
194                } else {
195                    false
196                }
197            }
198            FilterPredicate::Odd => {
199                if let OperationValue::NumberRef(num_str) = value {
200                    num_str.parse::<i64>().is_ok_and(|num| num % 2 == 1)
201                } else {
202                    false
203                }
204            }
205            FilterPredicate::EveryNth(n) => index.is_multiple_of(*n),
206            FilterPredicate::GreaterThan(threshold) => {
207                if let OperationValue::NumberRef(num_str) = value {
208                    num_str.parse::<i64>().is_ok_and(|num| num > *threshold)
209                } else {
210                    false
211                }
212            }
213            FilterPredicate::LessThan(threshold) => {
214                if let OperationValue::NumberRef(num_str) = value {
215                    num_str.parse::<i64>().is_ok_and(|num| num < *threshold)
216                } else {
217                    false
218                }
219            }
220            FilterPredicate::Equals(compare_value) => {
221                match (value, compare_value) {
222                    (OperationValue::NumberRef(num_str), OperationValue::NumberRef(cmp_str)) => {
223                        // Compare as numbers
224                        if let (Ok(a), Ok(b)) = (num_str.parse::<i64>(), cmp_str.parse::<i64>()) {
225                            a == b
226                        } else if let (Ok(a), Ok(b)) =
227                            (num_str.parse::<f64>(), cmp_str.parse::<f64>())
228                        {
229                            (a - b).abs() < f64::EPSILON
230                        } else {
231                            num_str == cmp_str
232                        }
233                    }
234                    (OperationValue::StringRef(s1), OperationValue::StringRef(s2)) => s1 == s2,
235                    (OperationValue::BoolRef(b1), OperationValue::BoolRef(b2)) => b1 == b2,
236                    (OperationValue::Null, OperationValue::Null) => true,
237                    _ => false,
238                }
239            }
240            FilterPredicate::Alternate => {
241                // Alternate - select every other element (even indices)
242                index.is_multiple_of(2)
243            }
244            FilterPredicate::Custom(predicate_fn) => {
245                // Custom predicate - execute the stored predicate string as a simple expression
246                // For now, support basic patterns like "value > N" or "value == N"
247                Self::evaluate_custom_predicate(predicate_fn, value)
248            }
249        }
250    }
251
252    /// Evaluate a custom predicate expression
253    fn evaluate_custom_predicate(predicate: &str, value: &OperationValue) -> bool {
254        let predicate = predicate.trim();
255
256        // Parse simple expressions like "value > 10", "value == 5", "value < 100"
257        if let Some(rest) = predicate.strip_prefix("value") {
258            let rest = rest.trim();
259
260            // Parse operator and threshold
261            if let Some(threshold_str) = rest.strip_prefix(">") {
262                if let Ok(threshold) = threshold_str.trim().parse::<i64>()
263                    && let OperationValue::NumberRef(num_str) = value
264                    && let Ok(num) = num_str.parse::<i64>()
265                {
266                    return num > threshold;
267                }
268            } else if let Some(threshold_str) = rest.strip_prefix("<") {
269                if let Ok(threshold) = threshold_str.trim().parse::<i64>()
270                    && let OperationValue::NumberRef(num_str) = value
271                    && let Ok(num) = num_str.parse::<i64>()
272                {
273                    return num < threshold;
274                }
275            } else if let Some(threshold_str) = rest.strip_prefix("==") {
276                let threshold_str = threshold_str.trim();
277                if let OperationValue::NumberRef(num_str) = value {
278                    return num_str == threshold_str;
279                } else if let OperationValue::StringRef(s) = value {
280                    return s == threshold_str.trim_matches('"');
281                }
282            } else if let Some(threshold_str) = rest.strip_prefix("!=") {
283                let threshold_str = threshold_str.trim();
284                if let OperationValue::NumberRef(num_str) = value {
285                    return num_str != threshold_str;
286                } else if let OperationValue::StringRef(s) = value {
287                    return s != threshold_str.trim_matches('"');
288                }
289            }
290        }
291
292        // Default to true if predicate couldn't be parsed
293        true
294    }
295
296    /// Apply a transformation to a value
297    fn apply_transform(value: &OperationValue, transform: &TransformFunction) -> OperationValue {
298        match (value, transform) {
299            (OperationValue::NumberRef(num_str), TransformFunction::Add(delta)) => {
300                num_str.parse::<i64>().map_or_else(
301                    |_| value.clone(),
302                    |num| OperationValue::NumberRef((num + delta).to_string()),
303                )
304            }
305            (OperationValue::NumberRef(num_str), TransformFunction::Multiply(factor)) => {
306                num_str.parse::<i64>().map_or_else(
307                    |_| value.clone(),
308                    |num| OperationValue::NumberRef((num * factor).to_string()),
309                )
310            }
311            (OperationValue::StringRef(text), TransformFunction::ToUppercase) => {
312                OperationValue::StringRef(text.to_uppercase())
313            }
314            (OperationValue::StringRef(text), TransformFunction::ToLowercase) => {
315                OperationValue::StringRef(text.to_lowercase())
316            }
317            (OperationValue::StringRef(text), TransformFunction::Append(suffix)) => {
318                OperationValue::StringRef(format!("{text}{suffix}"))
319            }
320            (OperationValue::StringRef(text), TransformFunction::Prepend(prefix)) => {
321                OperationValue::StringRef(format!("{prefix}{text}"))
322            }
323            _ => value.clone(), // Return unchanged for unsupported combinations
324        }
325    }
326
327    /// Flush current batch to processed batches
328    fn flush_batch(&mut self) {
329        if !self.current_batch.is_empty() {
330            let batch = std::mem::take(&mut self.current_batch);
331            self.processed_batches.push_back(batch);
332        }
333    }
334
335    /// Get the underlying processor for inspection
336    #[must_use]
337    pub const fn processor(&self) -> &BlackBoxProcessor {
338        &self.processor
339    }
340
341    /// Get processed batch count
342    #[must_use]
343    pub fn batch_count(&self) -> usize {
344        self.processed_batches.len()
345    }
346
347    /// Get total items processed
348    #[must_use]
349    pub fn total_items(&self) -> usize {
350        self.processed_batches.iter().map(Vec::len).sum::<usize>() + self.current_batch.len()
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use crate::FilterPredicate;
358
359    #[test]
360    fn test_stream_range_generation() {
361        let mut processor = StreamingProcessor::new(10);
362
363        let operations = vec![DsonOperation::StreamBuild {
364            path: "numbers".to_string(),
365            generator: StreamGenerator::Range {
366                start: 0,
367                end: 25,
368                step: 5,
369            },
370        }];
371
372        processor.process_stream(&operations).unwrap();
373
374        // Should have generated: 0, 5, 10, 15, 20
375        assert_eq!(processor.total_items(), 5);
376        // Note: No emit operation, so items remain in current batch
377    }
378
379    #[test]
380    fn test_stream_filter_even() {
381        let mut processor = StreamingProcessor::new(10);
382
383        let operations = vec![
384            DsonOperation::StreamBuild {
385                path: "numbers".to_string(),
386                generator: StreamGenerator::Range {
387                    start: 0,
388                    end: 10,
389                    step: 1,
390                },
391            },
392            DsonOperation::StreamFilter {
393                path: "numbers".to_string(),
394                predicate: FilterPredicate::Even,
395            },
396        ];
397
398        processor.process_stream(&operations).unwrap();
399
400        // Should have: 0, 2, 4, 6, 8 (5 even numbers)
401        assert_eq!(processor.total_items(), 5);
402    }
403
404    #[test]
405    fn test_stream_map_multiply() {
406        let mut processor = StreamingProcessor::new(10);
407
408        let operations = vec![
409            DsonOperation::StreamBuild {
410                path: "numbers".to_string(),
411                generator: StreamGenerator::Range {
412                    start: 1,
413                    end: 4,
414                    step: 1,
415                },
416            },
417            DsonOperation::StreamMap {
418                path: "numbers".to_string(),
419                transform: TransformFunction::Multiply(2),
420            },
421        ];
422
423        processor.process_stream(&operations).unwrap();
424
425        // Should have: 2, 4, 6 (1*2, 2*2, 3*2)
426        assert_eq!(processor.total_items(), 3);
427    }
428
429    #[test]
430    fn test_stream_fibonacci() {
431        let mut processor = StreamingProcessor::new(10);
432
433        let operations = vec![DsonOperation::StreamBuild {
434            path: "fib".to_string(),
435            generator: StreamGenerator::Fibonacci(8),
436        }];
437
438        processor.process_stream(&operations).unwrap();
439
440        // Should have: 0, 1, 1, 2, 3, 5, 8, 13
441        assert_eq!(processor.total_items(), 8);
442    }
443
444    #[test]
445    fn test_stream_repeat() {
446        let mut processor = StreamingProcessor::new(10);
447        let operations = vec![DsonOperation::StreamBuild {
448            path: "rep".to_string(),
449            generator: StreamGenerator::Repeat(OperationValue::StringRef("x".to_string()), 5),
450        }];
451        processor.process_stream(&operations).unwrap();
452        assert_eq!(processor.total_items(), 5);
453    }
454
455    #[test]
456    fn test_stream_emit() {
457        let mut processor = StreamingProcessor::new(5);
458        let operations = vec![
459            DsonOperation::StreamBuild {
460                path: "numbers".to_string(),
461                generator: StreamGenerator::Range {
462                    start: 0,
463                    end: 10,
464                    step: 1,
465                },
466            },
467            DsonOperation::StreamEmit {
468                path: "out".to_string(),
469                batch_size: 5,
470            },
471        ];
472        processor.process_stream(&operations).unwrap();
473    }
474
475    #[test]
476    fn test_stream_filter_odd() {
477        let mut processor = StreamingProcessor::new(10);
478        let operations = vec![
479            DsonOperation::StreamBuild {
480                path: "numbers".to_string(),
481                generator: StreamGenerator::Range {
482                    start: 0,
483                    end: 10,
484                    step: 1,
485                },
486            },
487            DsonOperation::StreamFilter {
488                path: "numbers".to_string(),
489                predicate: FilterPredicate::Odd,
490            },
491        ];
492        processor.process_stream(&operations).unwrap();
493        assert_eq!(processor.total_items(), 5);
494    }
495
496    #[test]
497    fn test_stream_filter_greater_than() {
498        let mut processor = StreamingProcessor::new(10);
499        let operations = vec![
500            DsonOperation::StreamBuild {
501                path: "numbers".to_string(),
502                generator: StreamGenerator::Range {
503                    start: 0,
504                    end: 10,
505                    step: 1,
506                },
507            },
508            DsonOperation::StreamFilter {
509                path: "numbers".to_string(),
510                predicate: FilterPredicate::GreaterThan(5),
511            },
512        ];
513        processor.process_stream(&operations).unwrap();
514        assert_eq!(processor.total_items(), 4); // 6, 7, 8, 9
515    }
516
517    #[test]
518    fn test_stream_filter_less_than() {
519        let mut processor = StreamingProcessor::new(10);
520        let operations = vec![
521            DsonOperation::StreamBuild {
522                path: "numbers".to_string(),
523                generator: StreamGenerator::Range {
524                    start: 0,
525                    end: 10,
526                    step: 1,
527                },
528            },
529            DsonOperation::StreamFilter {
530                path: "numbers".to_string(),
531                predicate: FilterPredicate::LessThan(5),
532            },
533        ];
534        processor.process_stream(&operations).unwrap();
535        assert_eq!(processor.total_items(), 5); // 0, 1, 2, 3, 4
536    }
537
538    #[test]
539    fn test_stream_filter_equals() {
540        let mut processor = StreamingProcessor::new(10);
541        let operations = vec![
542            DsonOperation::StreamBuild {
543                path: "numbers".to_string(),
544                generator: StreamGenerator::Range {
545                    start: 0,
546                    end: 10,
547                    step: 1,
548                },
549            },
550            DsonOperation::StreamFilter {
551                path: "numbers".to_string(),
552                predicate: FilterPredicate::Equals(OperationValue::NumberRef("5".to_string())),
553            },
554        ];
555        processor.process_stream(&operations).unwrap();
556        assert_eq!(processor.total_items(), 1);
557    }
558
559    #[test]
560    fn test_stream_filter_every_nth() {
561        let mut processor = StreamingProcessor::new(10);
562        let operations = vec![
563            DsonOperation::StreamBuild {
564                path: "numbers".to_string(),
565                generator: StreamGenerator::Range {
566                    start: 0,
567                    end: 12,
568                    step: 1,
569                },
570            },
571            DsonOperation::StreamFilter {
572                path: "numbers".to_string(),
573                predicate: FilterPredicate::EveryNth(3),
574            },
575        ];
576        processor.process_stream(&operations).unwrap();
577    }
578
579    #[test]
580    fn test_stream_filter_alternate() {
581        let mut processor = StreamingProcessor::new(10);
582        let operations = vec![
583            DsonOperation::StreamBuild {
584                path: "numbers".to_string(),
585                generator: StreamGenerator::Range {
586                    start: 0,
587                    end: 10,
588                    step: 1,
589                },
590            },
591            DsonOperation::StreamFilter {
592                path: "numbers".to_string(),
593                predicate: FilterPredicate::Alternate,
594            },
595        ];
596        processor.process_stream(&operations).unwrap();
597    }
598
599    #[test]
600    fn test_stream_filter_custom() {
601        let mut processor = StreamingProcessor::new(10);
602        let operations = vec![
603            DsonOperation::StreamBuild {
604                path: "numbers".to_string(),
605                generator: StreamGenerator::Range {
606                    start: 0,
607                    end: 20,
608                    step: 1,
609                },
610            },
611            DsonOperation::StreamFilter {
612                path: "numbers".to_string(),
613                predicate: FilterPredicate::Custom("value > 10".to_string()),
614            },
615        ];
616        processor.process_stream(&operations).unwrap();
617    }
618
619    #[test]
620    fn test_stream_map_add() {
621        let mut processor = StreamingProcessor::new(10);
622        let operations = vec![
623            DsonOperation::StreamBuild {
624                path: "numbers".to_string(),
625                generator: StreamGenerator::Range {
626                    start: 0,
627                    end: 5,
628                    step: 1,
629                },
630            },
631            DsonOperation::StreamMap {
632                path: "numbers".to_string(),
633                transform: TransformFunction::Add(10),
634            },
635        ];
636        processor.process_stream(&operations).unwrap();
637    }
638
639    #[test]
640    fn test_stream_map_to_lowercase() {
641        let mut processor = StreamingProcessor::new(10);
642        let operations = vec![
643            DsonOperation::StreamBuild {
644                path: "strings".to_string(),
645                generator: StreamGenerator::Repeat(
646                    OperationValue::StringRef("HELLO".to_string()),
647                    3,
648                ),
649            },
650            DsonOperation::StreamMap {
651                path: "strings".to_string(),
652                transform: TransformFunction::ToLowercase,
653            },
654        ];
655        processor.process_stream(&operations).unwrap();
656    }
657
658    #[test]
659    fn test_stream_map_to_uppercase() {
660        let mut processor = StreamingProcessor::new(10);
661        let operations = vec![
662            DsonOperation::StreamBuild {
663                path: "strings".to_string(),
664                generator: StreamGenerator::Repeat(
665                    OperationValue::StringRef("hello".to_string()),
666                    3,
667                ),
668            },
669            DsonOperation::StreamMap {
670                path: "strings".to_string(),
671                transform: TransformFunction::ToUppercase,
672            },
673        ];
674        processor.process_stream(&operations).unwrap();
675    }
676
677    #[test]
678    fn test_stream_map_append() {
679        let mut processor = StreamingProcessor::new(10);
680        let operations = vec![
681            DsonOperation::StreamBuild {
682                path: "strings".to_string(),
683                generator: StreamGenerator::Repeat(
684                    OperationValue::StringRef("hello".to_string()),
685                    2,
686                ),
687            },
688            DsonOperation::StreamMap {
689                path: "strings".to_string(),
690                transform: TransformFunction::Append("!".to_string()),
691            },
692        ];
693        processor.process_stream(&operations).unwrap();
694    }
695
696    #[test]
697    fn test_stream_map_prepend() {
698        let mut processor = StreamingProcessor::new(10);
699        let operations = vec![
700            DsonOperation::StreamBuild {
701                path: "strings".to_string(),
702                generator: StreamGenerator::Repeat(
703                    OperationValue::StringRef("world".to_string()),
704                    2,
705                ),
706            },
707            DsonOperation::StreamMap {
708                path: "strings".to_string(),
709                transform: TransformFunction::Prepend("hello ".to_string()),
710            },
711        ];
712        processor.process_stream(&operations).unwrap();
713    }
714
715    #[test]
716    fn test_stream_custom_generator() {
717        let mut processor = StreamingProcessor::new(10);
718        let operations = vec![DsonOperation::StreamBuild {
719            path: "custom".to_string(),
720            generator: StreamGenerator::Custom("test".to_string()),
721        }];
722        processor.process_stream(&operations).unwrap();
723    }
724
725    #[test]
726    fn test_processor_getter() {
727        let processor = StreamingProcessor::new(10);
728        let _ = processor.processor();
729    }
730
731    #[test]
732    fn test_batch_count_empty() {
733        let processor = StreamingProcessor::new(10);
734        assert_eq!(processor.batch_count(), 0);
735    }
736
737    // Additional tests for coverage
738
739    #[test]
740    fn test_pass_through_operations() {
741        let mut processor = StreamingProcessor::new(10);
742        // Test operations that pass through to underlying processor
743        let operations = vec![DsonOperation::FieldAdd {
744            path: "test".to_string(),
745            value: OperationValue::StringRef("value".to_string()),
746        }];
747        processor.process_stream(&operations).unwrap();
748    }
749
750    #[test]
751    fn test_generate_repeat_with_flush() {
752        // Small buffer to trigger flush during generation
753        let mut processor = StreamingProcessor::new(3);
754        let operations = vec![DsonOperation::StreamBuild {
755            path: "rep".to_string(),
756            generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
757        }];
758        processor.process_stream(&operations).unwrap();
759        // Should have flushed multiple times
760        assert!(processor.batch_count() > 0);
761    }
762
763    #[test]
764    fn test_generate_fibonacci_with_flush() {
765        // Small buffer to trigger flush during fibonacci generation
766        let mut processor = StreamingProcessor::new(3);
767        let operations = vec![DsonOperation::StreamBuild {
768            path: "fib".to_string(),
769            generator: StreamGenerator::Fibonacci(15),
770        }];
771        processor.process_stream(&operations).unwrap();
772        // Should have flushed multiple times
773        assert!(processor.batch_count() > 0);
774    }
775
776    #[test]
777    fn test_stream_emit_empty() {
778        let mut processor = StreamingProcessor::new(10);
779        // Emit with no data should be ok
780        let operations = vec![DsonOperation::StreamEmit {
781            path: "out".to_string(),
782            batch_size: 5,
783        }];
784        processor.process_stream(&operations).unwrap();
785    }
786
787    #[test]
788    fn test_stream_emit_with_processed_batches() {
789        let mut processor = StreamingProcessor::new(3);
790        // Generate enough to create processed batches
791        let operations = vec![
792            DsonOperation::StreamBuild {
793                path: "numbers".to_string(),
794                generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 10),
795            },
796            DsonOperation::StreamEmit {
797                path: "out".to_string(),
798                batch_size: 3,
799            },
800        ];
801        processor.process_stream(&operations).unwrap();
802    }
803
804    #[test]
805    fn test_custom_predicate_less_than() {
806        let mut processor = StreamingProcessor::new(10);
807        let operations = vec![
808            DsonOperation::StreamBuild {
809                path: "numbers".to_string(),
810                generator: StreamGenerator::Range {
811                    start: 0,
812                    end: 20,
813                    step: 1,
814                },
815            },
816            DsonOperation::StreamFilter {
817                path: "numbers".to_string(),
818                predicate: FilterPredicate::Custom("value < 5".to_string()),
819            },
820        ];
821        processor.process_stream(&operations).unwrap();
822        assert_eq!(processor.total_items(), 5); // 0, 1, 2, 3, 4
823    }
824
825    #[test]
826    fn test_custom_predicate_equals() {
827        let mut processor = StreamingProcessor::new(10);
828        let operations = vec![
829            DsonOperation::StreamBuild {
830                path: "numbers".to_string(),
831                generator: StreamGenerator::Range {
832                    start: 0,
833                    end: 10,
834                    step: 1,
835                },
836            },
837            DsonOperation::StreamFilter {
838                path: "numbers".to_string(),
839                predicate: FilterPredicate::Custom("value == 5".to_string()),
840            },
841        ];
842        processor.process_stream(&operations).unwrap();
843        assert_eq!(processor.total_items(), 1);
844    }
845
846    #[test]
847    fn test_custom_predicate_not_equals() {
848        let mut processor = StreamingProcessor::new(10);
849        let operations = vec![
850            DsonOperation::StreamBuild {
851                path: "numbers".to_string(),
852                generator: StreamGenerator::Range {
853                    start: 0,
854                    end: 10,
855                    step: 1,
856                },
857            },
858            DsonOperation::StreamFilter {
859                path: "numbers".to_string(),
860                predicate: FilterPredicate::Custom("value != 5".to_string()),
861            },
862        ];
863        processor.process_stream(&operations).unwrap();
864        assert_eq!(processor.total_items(), 9); // all except 5
865    }
866
867    #[test]
868    fn test_custom_predicate_invalid() {
869        let mut processor = StreamingProcessor::new(10);
870        let operations = vec![
871            DsonOperation::StreamBuild {
872                path: "numbers".to_string(),
873                generator: StreamGenerator::Range {
874                    start: 0,
875                    end: 5,
876                    step: 1,
877                },
878            },
879            DsonOperation::StreamFilter {
880                path: "numbers".to_string(),
881                predicate: FilterPredicate::Custom("invalid_predicate".to_string()),
882            },
883        ];
884        processor.process_stream(&operations).unwrap();
885        // Invalid predicate defaults to true, so all items pass
886        assert_eq!(processor.total_items(), 5);
887    }
888
889    #[test]
890    fn test_filter_equals_string() {
891        let mut processor = StreamingProcessor::new(10);
892        let operations = vec![
893            DsonOperation::StreamBuild {
894                path: "strings".to_string(),
895                generator: StreamGenerator::Repeat(
896                    OperationValue::StringRef("hello".to_string()),
897                    3,
898                ),
899            },
900            DsonOperation::StreamFilter {
901                path: "strings".to_string(),
902                predicate: FilterPredicate::Equals(OperationValue::StringRef("hello".to_string())),
903            },
904        ];
905        processor.process_stream(&operations).unwrap();
906        assert_eq!(processor.total_items(), 3);
907    }
908
909    #[test]
910    fn test_filter_equals_bool() {
911        let mut processor = StreamingProcessor::new(10);
912        // Add bools to current batch manually
913        processor.current_batch.push(OperationValue::BoolRef(true));
914        processor.current_batch.push(OperationValue::BoolRef(false));
915        processor.current_batch.push(OperationValue::BoolRef(true));
916
917        let operations = vec![DsonOperation::StreamFilter {
918            path: "bools".to_string(),
919            predicate: FilterPredicate::Equals(OperationValue::BoolRef(true)),
920        }];
921        processor.process_stream(&operations).unwrap();
922        assert_eq!(processor.total_items(), 2); // Two trues
923    }
924
925    #[test]
926    fn test_filter_equals_null() {
927        let mut processor = StreamingProcessor::new(10);
928        processor.current_batch.push(OperationValue::Null);
929        processor
930            .current_batch
931            .push(OperationValue::StringRef("not null".to_string()));
932        processor.current_batch.push(OperationValue::Null);
933
934        let operations = vec![DsonOperation::StreamFilter {
935            path: "nulls".to_string(),
936            predicate: FilterPredicate::Equals(OperationValue::Null),
937        }];
938        processor.process_stream(&operations).unwrap();
939        assert_eq!(processor.total_items(), 2); // Two nulls
940    }
941
942    #[test]
943    fn test_filter_equals_type_mismatch() {
944        let mut processor = StreamingProcessor::new(10);
945        processor
946            .current_batch
947            .push(OperationValue::NumberRef("5".to_string()));
948
949        let operations = vec![DsonOperation::StreamFilter {
950            path: "test".to_string(),
951            predicate: FilterPredicate::Equals(OperationValue::StringRef("5".to_string())),
952        }];
953        processor.process_stream(&operations).unwrap();
954        // Type mismatch returns false
955        assert_eq!(processor.total_items(), 0);
956    }
957
958    #[test]
959    fn test_filter_even_with_non_numbers() {
960        let mut processor = StreamingProcessor::new(10);
961        processor
962            .current_batch
963            .push(OperationValue::StringRef("not a number".to_string()));
964        processor
965            .current_batch
966            .push(OperationValue::NumberRef("4".to_string()));
967        processor.current_batch.push(OperationValue::BoolRef(true));
968
969        let operations = vec![DsonOperation::StreamFilter {
970            path: "test".to_string(),
971            predicate: FilterPredicate::Even,
972        }];
973        processor.process_stream(&operations).unwrap();
974        assert_eq!(processor.total_items(), 1); // Only 4 passes
975    }
976
977    #[test]
978    fn test_filter_odd_with_non_numbers() {
979        let mut processor = StreamingProcessor::new(10);
980        processor
981            .current_batch
982            .push(OperationValue::StringRef("not a number".to_string()));
983        processor
984            .current_batch
985            .push(OperationValue::NumberRef("3".to_string()));
986
987        let operations = vec![DsonOperation::StreamFilter {
988            path: "test".to_string(),
989            predicate: FilterPredicate::Odd,
990        }];
991        processor.process_stream(&operations).unwrap();
992        assert_eq!(processor.total_items(), 1); // Only 3 passes
993    }
994
995    #[test]
996    fn test_filter_greater_than_with_non_numbers() {
997        let mut processor = StreamingProcessor::new(10);
998        processor
999            .current_batch
1000            .push(OperationValue::StringRef("text".to_string()));
1001        processor
1002            .current_batch
1003            .push(OperationValue::NumberRef("10".to_string()));
1004
1005        let operations = vec![DsonOperation::StreamFilter {
1006            path: "test".to_string(),
1007            predicate: FilterPredicate::GreaterThan(5),
1008        }];
1009        processor.process_stream(&operations).unwrap();
1010        assert_eq!(processor.total_items(), 1); // Only 10 > 5
1011    }
1012
1013    #[test]
1014    fn test_filter_less_than_with_non_numbers() {
1015        let mut processor = StreamingProcessor::new(10);
1016        processor.current_batch.push(OperationValue::BoolRef(false));
1017        processor
1018            .current_batch
1019            .push(OperationValue::NumberRef("3".to_string()));
1020
1021        let operations = vec![DsonOperation::StreamFilter {
1022            path: "test".to_string(),
1023            predicate: FilterPredicate::LessThan(5),
1024        }];
1025        processor.process_stream(&operations).unwrap();
1026        assert_eq!(processor.total_items(), 1); // Only 3 < 5
1027    }
1028
1029    #[test]
1030    fn test_transform_invalid_number() {
1031        let mut processor = StreamingProcessor::new(10);
1032        processor
1033            .current_batch
1034            .push(OperationValue::NumberRef("not_a_number".to_string()));
1035
1036        let operations = vec![DsonOperation::StreamMap {
1037            path: "test".to_string(),
1038            transform: TransformFunction::Add(10),
1039        }];
1040        processor.process_stream(&operations).unwrap();
1041        // Invalid number should be returned unchanged
1042        assert_eq!(processor.total_items(), 1);
1043    }
1044
1045    #[test]
1046    fn test_transform_multiply_invalid_number() {
1047        let mut processor = StreamingProcessor::new(10);
1048        processor
1049            .current_batch
1050            .push(OperationValue::NumberRef("invalid".to_string()));
1051
1052        let operations = vec![DsonOperation::StreamMap {
1053            path: "test".to_string(),
1054            transform: TransformFunction::Multiply(2),
1055        }];
1056        processor.process_stream(&operations).unwrap();
1057        assert_eq!(processor.total_items(), 1);
1058    }
1059
1060    #[test]
1061    fn test_transform_unsupported_combination() {
1062        let mut processor = StreamingProcessor::new(10);
1063        // Try to uppercase a number (unsupported)
1064        processor
1065            .current_batch
1066            .push(OperationValue::NumberRef("5".to_string()));
1067
1068        let operations = vec![DsonOperation::StreamMap {
1069            path: "test".to_string(),
1070            transform: TransformFunction::ToUppercase,
1071        }];
1072        processor.process_stream(&operations).unwrap();
1073        // Should return unchanged
1074        assert_eq!(processor.total_items(), 1);
1075    }
1076
1077    #[test]
1078    fn test_filter_equals_float_comparison() {
1079        let mut processor = StreamingProcessor::new(10);
1080        processor
1081            .current_batch
1082            .push(OperationValue::NumberRef("3.14".to_string()));
1083        processor
1084            .current_batch
1085            .push(OperationValue::NumberRef("2.71".to_string()));
1086
1087        let operations = vec![DsonOperation::StreamFilter {
1088            path: "floats".to_string(),
1089            predicate: FilterPredicate::Equals(OperationValue::NumberRef("3.14".to_string())),
1090        }];
1091        processor.process_stream(&operations).unwrap();
1092        assert_eq!(processor.total_items(), 1);
1093    }
1094
1095    #[test]
1096    fn test_custom_predicate_string_equals() {
1097        let mut processor = StreamingProcessor::new(10);
1098        processor
1099            .current_batch
1100            .push(OperationValue::StringRef("hello".to_string()));
1101        processor
1102            .current_batch
1103            .push(OperationValue::StringRef("world".to_string()));
1104
1105        let operations = vec![DsonOperation::StreamFilter {
1106            path: "strings".to_string(),
1107            predicate: FilterPredicate::Custom("value == \"hello\"".to_string()),
1108        }];
1109        processor.process_stream(&operations).unwrap();
1110        assert_eq!(processor.total_items(), 1);
1111    }
1112
1113    #[test]
1114    fn test_custom_predicate_string_not_equals() {
1115        let mut processor = StreamingProcessor::new(10);
1116        processor
1117            .current_batch
1118            .push(OperationValue::StringRef("hello".to_string()));
1119        processor
1120            .current_batch
1121            .push(OperationValue::StringRef("world".to_string()));
1122
1123        let operations = vec![DsonOperation::StreamFilter {
1124            path: "strings".to_string(),
1125            predicate: FilterPredicate::Custom("value != \"hello\"".to_string()),
1126        }];
1127        processor.process_stream(&operations).unwrap();
1128        assert_eq!(processor.total_items(), 1);
1129    }
1130
1131    #[test]
1132    fn test_batch_count_after_generation() {
1133        let mut processor = StreamingProcessor::new(3);
1134        let operations = vec![DsonOperation::StreamBuild {
1135            path: "nums".to_string(),
1136            generator: StreamGenerator::Repeat(OperationValue::NumberRef("1".to_string()), 9),
1137        }];
1138        processor.process_stream(&operations).unwrap();
1139        assert!(processor.batch_count() >= 3);
1140    }
1141
1142    #[test]
1143    fn test_total_items_mixed() {
1144        let mut processor = StreamingProcessor::new(5);
1145        // Add some to current batch
1146        processor.current_batch.push(OperationValue::Null);
1147        processor.current_batch.push(OperationValue::Null);
1148
1149        // Flush to create processed batch
1150        processor.flush_batch();
1151
1152        // Add more to current batch
1153        processor.current_batch.push(OperationValue::Null);
1154
1155        assert_eq!(processor.total_items(), 3);
1156    }
1157}