fionn_stream/format_dson.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! Format-Agnostic DSON Processor
3//!
4//! This module provides a generic DSON processor that works with any format
5//! that implements the `FormatBatchProcessor` trait, enabling high-performance
6//! document processing with schema filtering and CRDT semantics across formats.
7//!
8//! # Unified Tape Architecture
9//!
10//! The batch processors emit unified tape structures instead of JSON strings,
11//! preserving format-specific information for lossless round-trips:
12//!
13//! - **ExtendedNodeType**: Format-specific markers (YamlAnchor, TomlTableStart, etc.)
14//! - **OriginalSyntax**: Preserved syntax for exact reconstruction
15//! - **TapeSegment**: Document/line boundaries within unified tape
16//!
17//! # Legacy String API
18//!
19//! For backward compatibility, the string-based `FormatBatchResult` is still
20//! available. New code should prefer `TapeBatchResult` for full fidelity.
21
22use crate::skiptape::CompiledSchema;
23use fionn_core::Result;
24use fionn_core::format::FormatKind;
25use fionn_ops::DsonOperation;
26
27use crate::skiptape::unified_tape::{TapeSegment, UnifiedTape};
28
29// =============================================================================
30// Batch Result Types
31// =============================================================================
32
33/// Statistics for batch processing
34#[derive(Debug, Clone, Default)]
35pub struct BatchStatistics {
36 /// Total lines/records processed
37 pub total_lines: usize,
38 /// Successfully processed lines
39 pub successful_lines: usize,
40 /// Failed lines
41 pub failed_lines: usize,
42 /// Total processing time in milliseconds
43 pub processing_time_ms: f64,
44 /// Average memory per line/record
45 pub avg_memory_per_line: usize,
46 /// Overall schema match ratio
47 pub overall_schema_match_ratio: f64,
48}
49
50/// Error for a specific line during batch processing
51#[derive(Debug, Clone)]
52pub struct LineError {
53 /// Index of the line in the original data
54 pub line_index: usize,
55 /// The error message
56 pub error_message: String,
57 /// Raw line content
58 pub raw_line: String,
59}
60
61/// Result of processing a batch of documents
62#[derive(Debug)]
63pub struct FormatBatchResult {
64 /// Successfully processed JSON documents (normalized output)
65 pub documents: Vec<String>,
66 /// Processing errors
67 pub errors: Vec<LineError>,
68 /// Batch statistics
69 pub statistics: BatchStatistics,
70}
71
72impl FormatBatchResult {
73 /// Create a new empty batch result
74 #[must_use]
75 pub fn new() -> Self {
76 Self {
77 documents: Vec::new(),
78 errors: Vec::new(),
79 statistics: BatchStatistics::default(),
80 }
81 }
82
83 /// Create a batch result with pre-allocated capacity
84 #[must_use]
85 pub fn with_capacity(capacity: usize) -> Self {
86 Self {
87 documents: Vec::with_capacity(capacity),
88 errors: Vec::new(),
89 statistics: BatchStatistics::default(),
90 }
91 }
92}
93
94impl Default for FormatBatchResult {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100// =============================================================================
101// Tape-Based Batch Result Types (Unified Tape Architecture)
102// =============================================================================
103
104/// Result of processing a batch to unified tape
105///
106/// This is the preferred result type for new code, preserving format-specific
107/// information for lossless round-trips and cross-format CRDT operations.
108
109#[derive(Debug)]
110pub struct TapeBatchResult<'arena> {
111 /// Unified tape containing all processed documents/lines
112 pub tape: UnifiedTape<'arena>,
113 /// Segment boundaries for individual documents/lines
114 pub segments: Vec<SegmentBoundary>,
115 /// Processing errors
116 pub errors: Vec<LineError>,
117 /// Batch statistics
118 pub statistics: BatchStatistics,
119}
120
121/// Boundary markers for segments within unified tape
122
123#[derive(Debug, Clone)]
124pub struct SegmentBoundary {
125 /// Start node index
126 pub start_idx: usize,
127 /// End node index (exclusive)
128 pub end_idx: usize,
129 /// Line/document index in source
130 pub source_idx: usize,
131}
132
133impl<'arena> TapeBatchResult<'arena> {
134 /// Create a new tape batch result
135 #[must_use]
136 pub fn new(tape: UnifiedTape<'arena>) -> Self {
137 Self {
138 tape,
139 segments: Vec::new(),
140 errors: Vec::new(),
141 statistics: BatchStatistics::default(),
142 }
143 }
144
145 /// Get a tape segment by index
146 #[must_use]
147 pub fn segment(&'arena self, idx: usize) -> Option<TapeSegment<'arena>> {
148 self.segments
149 .get(idx)
150 .map(|boundary| TapeSegment::new(&self.tape, boundary.start_idx, boundary.end_idx))
151 }
152
153 /// Get the number of segments
154 #[must_use]
155 pub const fn segment_count(&self) -> usize {
156 self.segments.len()
157 }
158
159 /// Iterate over all segments
160 pub fn iter_segments(&'arena self) -> impl Iterator<Item = TapeSegment<'arena>> {
161 self.segments
162 .iter()
163 .map(move |boundary| TapeSegment::new(&self.tape, boundary.start_idx, boundary.end_idx))
164 }
165}
166
167// =============================================================================
168// FormatBatchProcessor Trait
169// =============================================================================
170
171/// Trait for format-specific batch processors (legacy string-based API)
172///
173/// Implementations provide SIMD-accelerated batch processing for their format,
174/// with schema-aware filtering during the parsing phase.
175///
176/// **Note**: For new code, prefer implementing `TapeBatchProcessor` which
177/// preserves format-specific information for lossless round-trips.
178pub trait FormatBatchProcessor {
179 /// Get the format kind this processor handles
180 fn format_kind(&self) -> FormatKind;
181
182 /// Process a batch of data with schema filtering
183 ///
184 /// # Arguments
185 /// * `data` - Raw bytes in the format's encoding
186 /// * `schema` - Compiled schema for filtering
187 ///
188 /// # Errors
189 /// Returns an error if batch processing fundamentally fails
190 fn process_batch(&mut self, data: &[u8], schema: &CompiledSchema) -> Result<FormatBatchResult>;
191
192 /// Process a batch without schema filtering (all fields included)
193 ///
194 /// # Arguments
195 /// * `data` - Raw bytes in the format's encoding
196 ///
197 /// # Errors
198 /// Returns an error if batch processing fundamentally fails
199 fn process_batch_unfiltered(&mut self, data: &[u8]) -> Result<FormatBatchResult>;
200
201 /// Reset processor state for a new batch
202 fn reset(&mut self);
203}
204
205// =============================================================================
206// Tape-Based Batch Processor Trait (Unified Tape Architecture)
207// =============================================================================
208
209/// Trait for format-specific batch processors emitting unified tape
210///
211/// This is the preferred trait for new implementations, preserving format-specific
212/// information for lossless round-trips and cross-format CRDT operations.
213///
214/// # Example
215///
216/// ```ignore
217/// impl<'arena> TapeBatchProcessor<'arena> for MyFormatProcessor {
218/// fn process_to_tape(
219/// &mut self,
220/// data: &[u8],
221/// schema: &CompiledSchema,
222/// arena: &'arena Bump,
223/// ) -> Result<TapeBatchResult<'arena>> {
224/// let mut tape = UnifiedTape::new(arena, FormatKind::MyFormat);
225/// // Parse and emit to tape...
226/// Ok(TapeBatchResult::new(tape))
227/// }
228/// }
229/// ```
230pub trait TapeBatchProcessor<'arena> {
231 /// Get the format kind this processor handles
232 fn format_kind(&self) -> FormatKind;
233
234 /// Process a batch of data to unified tape with schema filtering
235 ///
236 /// # Arguments
237 /// * `data` - Raw bytes in the format's encoding
238 /// * `schema` - Compiled schema for filtering
239 /// * `arena` - Bump allocator for tape strings
240 ///
241 /// # Errors
242 /// Returns an error if batch processing fundamentally fails
243 fn process_to_tape(
244 &mut self,
245 data: &[u8],
246 schema: &CompiledSchema,
247 arena: &'arena bumpalo::Bump,
248 ) -> Result<TapeBatchResult<'arena>>;
249
250 /// Process a batch to unified tape without schema filtering
251 ///
252 /// # Arguments
253 /// * `data` - Raw bytes in the format's encoding
254 /// * `arena` - Bump allocator for tape strings
255 ///
256 /// # Errors
257 /// Returns an error if batch processing fundamentally fails
258 fn process_to_tape_unfiltered(
259 &mut self,
260 data: &[u8],
261 arena: &'arena bumpalo::Bump,
262 ) -> Result<TapeBatchResult<'arena>>;
263
264 /// Reset processor state for a new batch
265 fn reset(&mut self);
266}
267
268// =============================================================================
269// FormatDsonProcessor - Generic DSON Processor
270// =============================================================================
271
272/// Generic DSON processor for any format implementing `FormatBatchProcessor`
273///
274/// This wraps a format-specific batch processor and adds DSON operation support,
275/// enabling schema filtering and document transformations across all formats.
276pub struct FormatDsonProcessor<P: FormatBatchProcessor> {
277 /// The underlying format-specific batch processor
278 batch_processor: P,
279}
280
281impl<P: FormatBatchProcessor> FormatDsonProcessor<P> {
282 /// Create a new format DSON processor
283 #[must_use]
284 pub const fn new(batch_processor: P) -> Self {
285 Self { batch_processor }
286 }
287
288 /// Get the format kind
289 #[must_use]
290 pub fn format_kind(&self) -> FormatKind {
291 self.batch_processor.format_kind()
292 }
293
294 /// Get a reference to the underlying batch processor
295 #[must_use]
296 pub const fn batch_processor(&self) -> &P {
297 &self.batch_processor
298 }
299
300 /// Get a mutable reference to the underlying batch processor
301 pub const fn batch_processor_mut(&mut self) -> &mut P {
302 &mut self.batch_processor
303 }
304
305 /// Process data with schema filtering and DSON operations
306 ///
307 /// # Arguments
308 /// * `data` - Raw bytes in the format's encoding
309 /// * `schema` - Compiled schema for filtering
310 /// * `operations` - DSON operations to apply to each document
311 ///
312 /// # Errors
313 /// Returns an error if processing fails
314 pub fn process_with_operations(
315 &mut self,
316 data: &[u8],
317 schema: &CompiledSchema,
318 operations: &[DsonOperation],
319 ) -> Result<FormatBatchResult> {
320 // First, process batch with schema filtering
321 let batch_result = self.batch_processor.process_batch(data, schema)?;
322
323 // If no operations, return as-is
324 if operations.is_empty() {
325 return Ok(batch_result);
326 }
327
328 // Apply DSON operations to each filtered document
329 let mut processed_documents = Vec::with_capacity(batch_result.documents.len());
330 let mut operation_errors = batch_result.errors;
331
332 for (line_index, doc_json) in batch_result.documents.iter().enumerate() {
333 match Self::apply_operations_to_document(doc_json, operations) {
334 Ok(transformed) => processed_documents.push(transformed),
335 Err(e) => {
336 // On error, keep original document and record error
337 operation_errors.push(LineError {
338 line_index,
339 error_message: e.to_string(),
340 raw_line: doc_json.clone(),
341 });
342 processed_documents.push(doc_json.clone());
343 }
344 }
345 }
346
347 Ok(FormatBatchResult {
348 documents: processed_documents,
349 errors: operation_errors,
350 statistics: batch_result.statistics,
351 })
352 }
353
354 /// Process data with DSON operations only (no schema filtering)
355 ///
356 /// # Errors
357 /// Returns an error if processing fails
358 pub fn process_unfiltered_with_operations(
359 &mut self,
360 data: &[u8],
361 operations: &[DsonOperation],
362 ) -> Result<FormatBatchResult> {
363 let batch_result = self.batch_processor.process_batch_unfiltered(data)?;
364
365 if operations.is_empty() {
366 return Ok(batch_result);
367 }
368
369 let mut processed_documents = Vec::with_capacity(batch_result.documents.len());
370 let mut operation_errors = batch_result.errors;
371
372 for (line_index, doc_json) in batch_result.documents.iter().enumerate() {
373 match Self::apply_operations_to_document(doc_json, operations) {
374 Ok(transformed) => processed_documents.push(transformed),
375 Err(e) => {
376 operation_errors.push(LineError {
377 line_index,
378 error_message: e.to_string(),
379 raw_line: doc_json.clone(),
380 });
381 processed_documents.push(doc_json.clone());
382 }
383 }
384 }
385
386 Ok(FormatBatchResult {
387 documents: processed_documents,
388 errors: operation_errors,
389 statistics: batch_result.statistics,
390 })
391 }
392
393 /// Apply DSON operations to a single document
394 fn apply_operations_to_document(
395 doc_json: &str,
396 operations: &[DsonOperation],
397 ) -> Result<String> {
398 use fionn_ops::processor::BlackBoxProcessor;
399
400 // Create a processor for this document
401 let mut processor = BlackBoxProcessor::new_unfiltered();
402
403 // Process the document
404 processor.process(doc_json)?;
405
406 // Apply operations
407 processor.apply_operations(operations)?;
408
409 // Generate output
410 processor.generate_output()
411 }
412
413 /// Reset the processor for a new batch
414 pub fn reset(&mut self) {
415 self.batch_processor.reset();
416 }
417}
418
419// =============================================================================
420// Tests
421// =============================================================================
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426
427 // Mock batch processor for testing
428 struct MockBatchProcessor {
429 format: FormatKind,
430 }
431
432 impl MockBatchProcessor {
433 const fn new(format: FormatKind) -> Self {
434 Self { format }
435 }
436 }
437
438 impl FormatBatchProcessor for MockBatchProcessor {
439 fn format_kind(&self) -> FormatKind {
440 self.format
441 }
442
443 fn process_batch(
444 &mut self,
445 _data: &[u8],
446 _schema: &CompiledSchema,
447 ) -> Result<FormatBatchResult> {
448 Ok(FormatBatchResult {
449 documents: vec![r#"{"name":"test","value":42}"#.to_string()],
450 errors: vec![],
451 statistics: BatchStatistics {
452 total_lines: 1,
453 successful_lines: 1,
454 failed_lines: 0,
455 processing_time_ms: 0.1,
456 avg_memory_per_line: 50,
457 overall_schema_match_ratio: 1.0,
458 },
459 })
460 }
461
462 fn process_batch_unfiltered(&mut self, _data: &[u8]) -> Result<FormatBatchResult> {
463 self.process_batch(&[], &CompiledSchema::compile(&[]).unwrap())
464 }
465
466 fn reset(&mut self) {}
467 }
468
469 #[test]
470 fn test_format_dson_processor_creation() {
471 let processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
472 assert_eq!(processor.format_kind(), FormatKind::Json);
473 }
474
475 #[test]
476 fn test_batch_statistics_default() {
477 let stats = BatchStatistics::default();
478 assert_eq!(stats.total_lines, 0);
479 assert_eq!(stats.successful_lines, 0);
480 assert_eq!(stats.failed_lines, 0);
481 }
482
483 #[test]
484 fn test_format_batch_result_new() {
485 let result = FormatBatchResult::new();
486 assert!(result.documents.is_empty());
487 assert!(result.errors.is_empty());
488 }
489
490 #[test]
491 fn test_format_batch_result_with_capacity() {
492 let result = FormatBatchResult::with_capacity(100);
493 assert!(result.documents.capacity() >= 100);
494 }
495
496 #[test]
497 fn test_process_with_no_operations() {
498 let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
499 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
500
501 let result = processor
502 .process_with_operations(b"{}", &schema, &[])
503 .unwrap();
504 assert_eq!(result.documents.len(), 1);
505 }
506
507 #[test]
508 fn test_process_with_operations() {
509 use fionn_ops::OperationValue;
510
511 let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
512 let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
513
514 let operations = vec![DsonOperation::FieldAdd {
515 path: "added".to_string(),
516 value: OperationValue::StringRef("new_value".to_string()),
517 }];
518
519 let result = processor
520 .process_with_operations(b"{}", &schema, &operations)
521 .unwrap();
522 assert_eq!(result.documents.len(), 1);
523 // The document should have the added field
524 assert!(result.documents[0].contains("added"));
525 }
526
527 #[test]
528 fn test_line_error_debug() {
529 let error = LineError {
530 line_index: 0,
531 error_message: "test error".to_string(),
532 raw_line: "test".to_string(),
533 };
534 let debug = format!("{error:?}");
535 assert!(debug.contains("LineError"));
536 }
537
538 #[test]
539 fn test_batch_processor_reset() {
540 let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
541 processor.reset();
542 // Reset should not panic
543 }
544
545 #[test]
546 fn test_batch_processor_mut_access() {
547 let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
548 let _batch_proc = processor.batch_processor_mut();
549 // Should be able to get mutable access
550 }
551}