Skip to main content

fionn_stream/
jsonl_dson.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! JSONL-DSON Integration
3//!
4//! This module provides integration between SIMD-JSONL processing and DSON operations,
5//! enabling high-performance processing of JSON Lines with schema filtering and CRDT semantics.
6
7use crate::skiptape::CompiledSchema;
8use crate::skiptape::jsonl::SimdJsonlBatchProcessor;
9use fionn_core::Result;
10use fionn_ops::DsonOperation;
11use std::collections::HashSet;
12
13/// JSONL-DSON processor for high-performance document processing
14pub struct JsonlDsonProcessor {
15    /// SIMD-JSONL batch processor
16    jsonl_processor: SimdJsonlBatchProcessor,
17}
18
19impl JsonlDsonProcessor {
20    /// Create a new JSONL-DSON processor
21    #[must_use]
22    pub fn new(_input_schema: HashSet<String>, _output_schema: HashSet<String>) -> Self {
23        Self {
24            jsonl_processor: SimdJsonlBatchProcessor::new(),
25        }
26    }
27
28    /// Process JSONL data with schema filtering and DSON operations
29    ///
30    /// # Errors
31    /// Returns an error if processing fails
32    pub fn process_jsonl_with_operations(
33        &mut self,
34        jsonl_data: &[u8],
35        schema: &CompiledSchema,
36        operations: &[DsonOperation],
37    ) -> Result<ProcessedBatch> {
38        // First, process JSONL with SIMD filtering
39        let batch_result = self
40            .jsonl_processor
41            .process_batch_optimized(jsonl_data, schema)?;
42
43        // If no operations, return as-is
44        if operations.is_empty() {
45            return Ok(ProcessedBatch {
46                documents: batch_result.documents,
47                errors: batch_result.errors,
48                statistics: batch_result.statistics,
49            });
50        }
51
52        // Apply DSON operations to each filtered document
53        let mut processed_documents = Vec::with_capacity(batch_result.documents.len());
54
55        for doc_json in &batch_result.documents {
56            match Self::apply_operations_to_document(doc_json, operations) {
57                Ok(transformed) => processed_documents.push(transformed),
58                Err(_) => {
59                    // On error, keep original document
60                    processed_documents.push(doc_json.clone());
61                }
62            }
63        }
64
65        Ok(ProcessedBatch {
66            documents: processed_documents,
67            errors: batch_result.errors,
68            statistics: batch_result.statistics,
69        })
70    }
71
72    /// Apply DSON operations to a single document
73    fn apply_operations_to_document(
74        doc_json: &str,
75        operations: &[DsonOperation],
76    ) -> Result<String> {
77        use fionn_ops::processor::BlackBoxProcessor;
78
79        // Create a processor for this document
80        let mut processor = BlackBoxProcessor::new_unfiltered();
81
82        // Process the document
83        processor.process(doc_json)?;
84
85        // Apply operations
86        processor.apply_operations(operations)?;
87
88        // Generate output
89        processor.generate_output()
90    }
91}
92
93/// Result of processing a batch of JSONL documents with DSON operations
94#[derive(Debug)]
95pub struct ProcessedBatch {
96    /// Processed JSON documents
97    pub documents: Vec<String>,
98    /// Processing errors
99    pub errors: Vec<crate::skiptape::jsonl::LineError>,
100    /// Batch statistics
101    pub statistics: crate::skiptape::jsonl::BatchStatistics,
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::skiptape::CompiledSchema;
108    use fionn_ops::OperationValue;
109
110    #[test]
111    fn test_jsonl_dson_processor_creation() {
112        let input_schema = HashSet::from(["name".to_string(), "value".to_string()]);
113        let output_schema = HashSet::from(["name".to_string()]);
114
115        let _processor = JsonlDsonProcessor::new(input_schema, output_schema);
116        // Processor successfully created - test passes if no panic
117    }
118
119    #[test]
120    fn test_process_empty_jsonl() {
121        let mut processor = JsonlDsonProcessor::new(
122            HashSet::from(["*".to_string()]),
123            HashSet::from(["*".to_string()]),
124        );
125
126        let schema = CompiledSchema::compile(&["name".to_string()]).unwrap();
127        let result = processor.process_jsonl_with_operations(&[], &schema, &[]);
128
129        // Should succeed with empty results
130        assert!(result.is_ok());
131        let batch = result.unwrap();
132        assert!(batch.documents.is_empty());
133        assert!(batch.errors.is_empty());
134    }
135
136    #[test]
137    fn test_process_single_document() {
138        let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
139
140        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
141        let jsonl = b"{\"name\":\"test\"}";
142        let result = processor.process_jsonl_with_operations(jsonl, &schema, &[]);
143
144        assert!(result.is_ok());
145    }
146
147    #[test]
148    fn test_process_with_operations() {
149        let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
150
151        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
152        let jsonl = b"{\"name\":\"test\"}";
153        let operations = vec![DsonOperation::FieldAdd {
154            path: "added".to_string(),
155            value: OperationValue::StringRef("value".to_string()),
156        }];
157        let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
158
159        assert!(result.is_ok());
160    }
161
162    #[test]
163    fn test_processed_batch_debug() {
164        let batch = ProcessedBatch {
165            documents: vec!["{}".to_string()],
166            errors: vec![],
167            statistics: crate::skiptape::jsonl::BatchStatistics {
168                total_lines: 1,
169                successful_lines: 1,
170                failed_lines: 0,
171                processing_time_ms: 0.1,
172                avg_memory_per_line: 10,
173                overall_schema_match_ratio: 1.0,
174            },
175        };
176        let debug = format!("{batch:?}");
177        assert!(!debug.is_empty());
178    }
179
180    #[test]
181    fn test_process_multiple_documents() {
182        let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
183
184        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
185        let jsonl = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}";
186        let result = processor.process_jsonl_with_operations(jsonl, &schema, &[]);
187
188        assert!(result.is_ok());
189    }
190
191    #[test]
192    fn test_process_with_field_modify_operation() {
193        let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
194
195        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
196        let jsonl = b"{\"name\":\"original\"}";
197        let operations = vec![DsonOperation::FieldModify {
198            path: "name".to_string(),
199            value: OperationValue::StringRef("modified".to_string()),
200        }];
201        let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
202
203        assert!(result.is_ok());
204        let batch = result.unwrap();
205        assert!(!batch.documents.is_empty());
206    }
207
208    #[test]
209    fn test_process_with_field_add_and_delete() {
210        let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
211
212        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
213        let jsonl = b"{\"name\":\"test\",\"toDelete\":\"value\"}";
214        let operations = vec![
215            DsonOperation::FieldAdd {
216                path: "new_field".to_string(),
217                value: OperationValue::StringRef("new_value".to_string()),
218            },
219            DsonOperation::FieldDelete {
220                path: "toDelete".to_string(),
221            },
222        ];
223        let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
224
225        assert!(result.is_ok());
226    }
227
228    #[test]
229    fn test_process_multiple_documents_with_operations() {
230        let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
231
232        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
233        let jsonl = b"{\"id\":1}\n{\"id\":2}\n{\"id\":3}";
234        let operations = vec![DsonOperation::FieldAdd {
235            path: "processed".to_string(),
236            value: OperationValue::BoolRef(true),
237        }];
238        let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
239
240        assert!(result.is_ok());
241        let batch = result.unwrap();
242        assert_eq!(batch.documents.len(), 3);
243    }
244}