fionn_stream/
jsonl_dson.rs1use crate::skiptape::CompiledSchema;
8use crate::skiptape::jsonl::SimdJsonlBatchProcessor;
9use fionn_core::Result;
10use fionn_ops::DsonOperation;
11use std::collections::HashSet;
12
13pub struct JsonlDsonProcessor {
15 jsonl_processor: SimdJsonlBatchProcessor,
17}
18
19impl JsonlDsonProcessor {
20 #[must_use]
22 pub fn new(_input_schema: HashSet<String>, _output_schema: HashSet<String>) -> Self {
23 Self {
24 jsonl_processor: SimdJsonlBatchProcessor::new(),
25 }
26 }
27
28 pub fn process_jsonl_with_operations(
33 &mut self,
34 jsonl_data: &[u8],
35 schema: &CompiledSchema,
36 operations: &[DsonOperation],
37 ) -> Result<ProcessedBatch> {
38 let batch_result = self
40 .jsonl_processor
41 .process_batch_optimized(jsonl_data, schema)?;
42
43 if operations.is_empty() {
45 return Ok(ProcessedBatch {
46 documents: batch_result.documents,
47 errors: batch_result.errors,
48 statistics: batch_result.statistics,
49 });
50 }
51
52 let mut processed_documents = Vec::with_capacity(batch_result.documents.len());
54
55 for doc_json in &batch_result.documents {
56 match Self::apply_operations_to_document(doc_json, operations) {
57 Ok(transformed) => processed_documents.push(transformed),
58 Err(_) => {
59 processed_documents.push(doc_json.clone());
61 }
62 }
63 }
64
65 Ok(ProcessedBatch {
66 documents: processed_documents,
67 errors: batch_result.errors,
68 statistics: batch_result.statistics,
69 })
70 }
71
72 fn apply_operations_to_document(
74 doc_json: &str,
75 operations: &[DsonOperation],
76 ) -> Result<String> {
77 use fionn_ops::processor::BlackBoxProcessor;
78
79 let mut processor = BlackBoxProcessor::new_unfiltered();
81
82 processor.process(doc_json)?;
84
85 processor.apply_operations(operations)?;
87
88 processor.generate_output()
90 }
91}
92
93#[derive(Debug)]
95pub struct ProcessedBatch {
96 pub documents: Vec<String>,
98 pub errors: Vec<crate::skiptape::jsonl::LineError>,
100 pub statistics: crate::skiptape::jsonl::BatchStatistics,
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::skiptape::CompiledSchema;
108 use fionn_ops::OperationValue;
109
110 #[test]
111 fn test_jsonl_dson_processor_creation() {
112 let input_schema = HashSet::from(["name".to_string(), "value".to_string()]);
113 let output_schema = HashSet::from(["name".to_string()]);
114
115 let _processor = JsonlDsonProcessor::new(input_schema, output_schema);
116 }
118
119 #[test]
120 fn test_process_empty_jsonl() {
121 let mut processor = JsonlDsonProcessor::new(
122 HashSet::from(["*".to_string()]),
123 HashSet::from(["*".to_string()]),
124 );
125
126 let schema = CompiledSchema::compile(&["name".to_string()]).unwrap();
127 let result = processor.process_jsonl_with_operations(&[], &schema, &[]);
128
129 assert!(result.is_ok());
131 let batch = result.unwrap();
132 assert!(batch.documents.is_empty());
133 assert!(batch.errors.is_empty());
134 }
135
136 #[test]
137 fn test_process_single_document() {
138 let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
139
140 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
141 let jsonl = b"{\"name\":\"test\"}";
142 let result = processor.process_jsonl_with_operations(jsonl, &schema, &[]);
143
144 assert!(result.is_ok());
145 }
146
147 #[test]
148 fn test_process_with_operations() {
149 let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
150
151 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
152 let jsonl = b"{\"name\":\"test\"}";
153 let operations = vec![DsonOperation::FieldAdd {
154 path: "added".to_string(),
155 value: OperationValue::StringRef("value".to_string()),
156 }];
157 let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
158
159 assert!(result.is_ok());
160 }
161
162 #[test]
163 fn test_processed_batch_debug() {
164 let batch = ProcessedBatch {
165 documents: vec!["{}".to_string()],
166 errors: vec![],
167 statistics: crate::skiptape::jsonl::BatchStatistics {
168 total_lines: 1,
169 successful_lines: 1,
170 failed_lines: 0,
171 processing_time_ms: 0.1,
172 avg_memory_per_line: 10,
173 overall_schema_match_ratio: 1.0,
174 },
175 };
176 let debug = format!("{batch:?}");
177 assert!(!debug.is_empty());
178 }
179
180 #[test]
181 fn test_process_multiple_documents() {
182 let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
183
184 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
185 let jsonl = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}";
186 let result = processor.process_jsonl_with_operations(jsonl, &schema, &[]);
187
188 assert!(result.is_ok());
189 }
190
191 #[test]
192 fn test_process_with_field_modify_operation() {
193 let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
194
195 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
196 let jsonl = b"{\"name\":\"original\"}";
197 let operations = vec![DsonOperation::FieldModify {
198 path: "name".to_string(),
199 value: OperationValue::StringRef("modified".to_string()),
200 }];
201 let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
202
203 assert!(result.is_ok());
204 let batch = result.unwrap();
205 assert!(!batch.documents.is_empty());
206 }
207
208 #[test]
209 fn test_process_with_field_add_and_delete() {
210 let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
211
212 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
213 let jsonl = b"{\"name\":\"test\",\"toDelete\":\"value\"}";
214 let operations = vec![
215 DsonOperation::FieldAdd {
216 path: "new_field".to_string(),
217 value: OperationValue::StringRef("new_value".to_string()),
218 },
219 DsonOperation::FieldDelete {
220 path: "toDelete".to_string(),
221 },
222 ];
223 let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
224
225 assert!(result.is_ok());
226 }
227
228 #[test]
229 fn test_process_multiple_documents_with_operations() {
230 let mut processor = JsonlDsonProcessor::new(HashSet::new(), HashSet::new());
231
232 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
233 let jsonl = b"{\"id\":1}\n{\"id\":2}\n{\"id\":3}";
234 let operations = vec![DsonOperation::FieldAdd {
235 path: "processed".to_string(),
236 value: OperationValue::BoolRef(true),
237 }];
238 let result = processor.process_jsonl_with_operations(jsonl, &schema, &operations);
239
240 assert!(result.is_ok());
241 let batch = result.unwrap();
242 assert_eq!(batch.documents.len(), 3);
243 }
244}