Skip to main content

fionn_stream/
format_dson.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! Format-Agnostic DSON Processor
3//!
4//! This module provides a generic DSON processor that works with any format
5//! that implements the `FormatBatchProcessor` trait, enabling high-performance
6//! document processing with schema filtering and CRDT semantics across formats.
7//!
8//! # Unified Tape Architecture
9//!
10//! The batch processors emit unified tape structures instead of JSON strings,
11//! preserving format-specific information for lossless round-trips:
12//!
13//! - **ExtendedNodeType**: Format-specific markers (YamlAnchor, TomlTableStart, etc.)
14//! - **OriginalSyntax**: Preserved syntax for exact reconstruction
15//! - **TapeSegment**: Document/line boundaries within unified tape
16//!
17//! # Legacy String API
18//!
19//! For backward compatibility, the string-based `FormatBatchResult` is still
20//! available. New code should prefer `TapeBatchResult` for full fidelity.
21
22use crate::skiptape::CompiledSchema;
23use fionn_core::Result;
24use fionn_core::format::FormatKind;
25use fionn_ops::DsonOperation;
26
27use crate::skiptape::unified_tape::{TapeSegment, UnifiedTape};
28
29// =============================================================================
30// Batch Result Types
31// =============================================================================
32
33/// Statistics for batch processing
34#[derive(Debug, Clone, Default)]
35pub struct BatchStatistics {
36    /// Total lines/records processed
37    pub total_lines: usize,
38    /// Successfully processed lines
39    pub successful_lines: usize,
40    /// Failed lines
41    pub failed_lines: usize,
42    /// Total processing time in milliseconds
43    pub processing_time_ms: f64,
44    /// Average memory per line/record
45    pub avg_memory_per_line: usize,
46    /// Overall schema match ratio
47    pub overall_schema_match_ratio: f64,
48}
49
50/// Error for a specific line during batch processing
51#[derive(Debug, Clone)]
52pub struct LineError {
53    /// Index of the line in the original data
54    pub line_index: usize,
55    /// The error message
56    pub error_message: String,
57    /// Raw line content
58    pub raw_line: String,
59}
60
61/// Result of processing a batch of documents
62#[derive(Debug)]
63pub struct FormatBatchResult {
64    /// Successfully processed JSON documents (normalized output)
65    pub documents: Vec<String>,
66    /// Processing errors
67    pub errors: Vec<LineError>,
68    /// Batch statistics
69    pub statistics: BatchStatistics,
70}
71
72impl FormatBatchResult {
73    /// Create a new empty batch result
74    #[must_use]
75    pub fn new() -> Self {
76        Self {
77            documents: Vec::new(),
78            errors: Vec::new(),
79            statistics: BatchStatistics::default(),
80        }
81    }
82
83    /// Create a batch result with pre-allocated capacity
84    #[must_use]
85    pub fn with_capacity(capacity: usize) -> Self {
86        Self {
87            documents: Vec::with_capacity(capacity),
88            errors: Vec::new(),
89            statistics: BatchStatistics::default(),
90        }
91    }
92}
93
94impl Default for FormatBatchResult {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100// =============================================================================
101// Tape-Based Batch Result Types (Unified Tape Architecture)
102// =============================================================================
103
104/// Result of processing a batch to unified tape
105///
106/// This is the preferred result type for new code, preserving format-specific
107/// information for lossless round-trips and cross-format CRDT operations.
108
109#[derive(Debug)]
110pub struct TapeBatchResult<'arena> {
111    /// Unified tape containing all processed documents/lines
112    pub tape: UnifiedTape<'arena>,
113    /// Segment boundaries for individual documents/lines
114    pub segments: Vec<SegmentBoundary>,
115    /// Processing errors
116    pub errors: Vec<LineError>,
117    /// Batch statistics
118    pub statistics: BatchStatistics,
119}
120
121/// Boundary markers for segments within unified tape
122
123#[derive(Debug, Clone)]
124pub struct SegmentBoundary {
125    /// Start node index
126    pub start_idx: usize,
127    /// End node index (exclusive)
128    pub end_idx: usize,
129    /// Line/document index in source
130    pub source_idx: usize,
131}
132
133impl<'arena> TapeBatchResult<'arena> {
134    /// Create a new tape batch result
135    #[must_use]
136    pub fn new(tape: UnifiedTape<'arena>) -> Self {
137        Self {
138            tape,
139            segments: Vec::new(),
140            errors: Vec::new(),
141            statistics: BatchStatistics::default(),
142        }
143    }
144
145    /// Get a tape segment by index
146    #[must_use]
147    pub fn segment(&'arena self, idx: usize) -> Option<TapeSegment<'arena>> {
148        self.segments
149            .get(idx)
150            .map(|boundary| TapeSegment::new(&self.tape, boundary.start_idx, boundary.end_idx))
151    }
152
153    /// Get the number of segments
154    #[must_use]
155    pub const fn segment_count(&self) -> usize {
156        self.segments.len()
157    }
158
159    /// Iterate over all segments
160    pub fn iter_segments(&'arena self) -> impl Iterator<Item = TapeSegment<'arena>> {
161        self.segments
162            .iter()
163            .map(move |boundary| TapeSegment::new(&self.tape, boundary.start_idx, boundary.end_idx))
164    }
165}
166
167// =============================================================================
168// FormatBatchProcessor Trait
169// =============================================================================
170
171/// Trait for format-specific batch processors (legacy string-based API)
172///
173/// Implementations provide SIMD-accelerated batch processing for their format,
174/// with schema-aware filtering during the parsing phase.
175///
176/// **Note**: For new code, prefer implementing `TapeBatchProcessor` which
177/// preserves format-specific information for lossless round-trips.
178pub trait FormatBatchProcessor {
179    /// Get the format kind this processor handles
180    fn format_kind(&self) -> FormatKind;
181
182    /// Process a batch of data with schema filtering
183    ///
184    /// # Arguments
185    /// * `data` - Raw bytes in the format's encoding
186    /// * `schema` - Compiled schema for filtering
187    ///
188    /// # Errors
189    /// Returns an error if batch processing fundamentally fails
190    fn process_batch(&mut self, data: &[u8], schema: &CompiledSchema) -> Result<FormatBatchResult>;
191
192    /// Process a batch without schema filtering (all fields included)
193    ///
194    /// # Arguments
195    /// * `data` - Raw bytes in the format's encoding
196    ///
197    /// # Errors
198    /// Returns an error if batch processing fundamentally fails
199    fn process_batch_unfiltered(&mut self, data: &[u8]) -> Result<FormatBatchResult>;
200
201    /// Reset processor state for a new batch
202    fn reset(&mut self);
203}
204
205// =============================================================================
206// Tape-Based Batch Processor Trait (Unified Tape Architecture)
207// =============================================================================
208
209/// Trait for format-specific batch processors emitting unified tape
210///
211/// This is the preferred trait for new implementations, preserving format-specific
212/// information for lossless round-trips and cross-format CRDT operations.
213///
214/// # Example
215///
216/// ```ignore
217/// impl<'arena> TapeBatchProcessor<'arena> for MyFormatProcessor {
218///     fn process_to_tape(
219///         &mut self,
220///         data: &[u8],
221///         schema: &CompiledSchema,
222///         arena: &'arena Bump,
223///     ) -> Result<TapeBatchResult<'arena>> {
224///         let mut tape = UnifiedTape::new(arena, FormatKind::MyFormat);
225///         // Parse and emit to tape...
226///         Ok(TapeBatchResult::new(tape))
227///     }
228/// }
229/// ```
230pub trait TapeBatchProcessor<'arena> {
231    /// Get the format kind this processor handles
232    fn format_kind(&self) -> FormatKind;
233
234    /// Process a batch of data to unified tape with schema filtering
235    ///
236    /// # Arguments
237    /// * `data` - Raw bytes in the format's encoding
238    /// * `schema` - Compiled schema for filtering
239    /// * `arena` - Bump allocator for tape strings
240    ///
241    /// # Errors
242    /// Returns an error if batch processing fundamentally fails
243    fn process_to_tape(
244        &mut self,
245        data: &[u8],
246        schema: &CompiledSchema,
247        arena: &'arena bumpalo::Bump,
248    ) -> Result<TapeBatchResult<'arena>>;
249
250    /// Process a batch to unified tape without schema filtering
251    ///
252    /// # Arguments
253    /// * `data` - Raw bytes in the format's encoding
254    /// * `arena` - Bump allocator for tape strings
255    ///
256    /// # Errors
257    /// Returns an error if batch processing fundamentally fails
258    fn process_to_tape_unfiltered(
259        &mut self,
260        data: &[u8],
261        arena: &'arena bumpalo::Bump,
262    ) -> Result<TapeBatchResult<'arena>>;
263
264    /// Reset processor state for a new batch
265    fn reset(&mut self);
266}
267
268// =============================================================================
269// FormatDsonProcessor - Generic DSON Processor
270// =============================================================================
271
272/// Generic DSON processor for any format implementing `FormatBatchProcessor`
273///
274/// This wraps a format-specific batch processor and adds DSON operation support,
275/// enabling schema filtering and document transformations across all formats.
276pub struct FormatDsonProcessor<P: FormatBatchProcessor> {
277    /// The underlying format-specific batch processor
278    batch_processor: P,
279}
280
281impl<P: FormatBatchProcessor> FormatDsonProcessor<P> {
282    /// Create a new format DSON processor
283    #[must_use]
284    pub const fn new(batch_processor: P) -> Self {
285        Self { batch_processor }
286    }
287
288    /// Get the format kind
289    #[must_use]
290    pub fn format_kind(&self) -> FormatKind {
291        self.batch_processor.format_kind()
292    }
293
294    /// Get a reference to the underlying batch processor
295    #[must_use]
296    pub const fn batch_processor(&self) -> &P {
297        &self.batch_processor
298    }
299
300    /// Get a mutable reference to the underlying batch processor
301    pub const fn batch_processor_mut(&mut self) -> &mut P {
302        &mut self.batch_processor
303    }
304
305    /// Process data with schema filtering and DSON operations
306    ///
307    /// # Arguments
308    /// * `data` - Raw bytes in the format's encoding
309    /// * `schema` - Compiled schema for filtering
310    /// * `operations` - DSON operations to apply to each document
311    ///
312    /// # Errors
313    /// Returns an error if processing fails
314    pub fn process_with_operations(
315        &mut self,
316        data: &[u8],
317        schema: &CompiledSchema,
318        operations: &[DsonOperation],
319    ) -> Result<FormatBatchResult> {
320        // First, process batch with schema filtering
321        let batch_result = self.batch_processor.process_batch(data, schema)?;
322
323        // If no operations, return as-is
324        if operations.is_empty() {
325            return Ok(batch_result);
326        }
327
328        // Apply DSON operations to each filtered document
329        let mut processed_documents = Vec::with_capacity(batch_result.documents.len());
330        let mut operation_errors = batch_result.errors;
331
332        for (line_index, doc_json) in batch_result.documents.iter().enumerate() {
333            match Self::apply_operations_to_document(doc_json, operations) {
334                Ok(transformed) => processed_documents.push(transformed),
335                Err(e) => {
336                    // On error, keep original document and record error
337                    operation_errors.push(LineError {
338                        line_index,
339                        error_message: e.to_string(),
340                        raw_line: doc_json.clone(),
341                    });
342                    processed_documents.push(doc_json.clone());
343                }
344            }
345        }
346
347        Ok(FormatBatchResult {
348            documents: processed_documents,
349            errors: operation_errors,
350            statistics: batch_result.statistics,
351        })
352    }
353
354    /// Process data with DSON operations only (no schema filtering)
355    ///
356    /// # Errors
357    /// Returns an error if processing fails
358    pub fn process_unfiltered_with_operations(
359        &mut self,
360        data: &[u8],
361        operations: &[DsonOperation],
362    ) -> Result<FormatBatchResult> {
363        let batch_result = self.batch_processor.process_batch_unfiltered(data)?;
364
365        if operations.is_empty() {
366            return Ok(batch_result);
367        }
368
369        let mut processed_documents = Vec::with_capacity(batch_result.documents.len());
370        let mut operation_errors = batch_result.errors;
371
372        for (line_index, doc_json) in batch_result.documents.iter().enumerate() {
373            match Self::apply_operations_to_document(doc_json, operations) {
374                Ok(transformed) => processed_documents.push(transformed),
375                Err(e) => {
376                    operation_errors.push(LineError {
377                        line_index,
378                        error_message: e.to_string(),
379                        raw_line: doc_json.clone(),
380                    });
381                    processed_documents.push(doc_json.clone());
382                }
383            }
384        }
385
386        Ok(FormatBatchResult {
387            documents: processed_documents,
388            errors: operation_errors,
389            statistics: batch_result.statistics,
390        })
391    }
392
393    /// Apply DSON operations to a single document
394    fn apply_operations_to_document(
395        doc_json: &str,
396        operations: &[DsonOperation],
397    ) -> Result<String> {
398        use fionn_ops::processor::BlackBoxProcessor;
399
400        // Create a processor for this document
401        let mut processor = BlackBoxProcessor::new_unfiltered();
402
403        // Process the document
404        processor.process(doc_json)?;
405
406        // Apply operations
407        processor.apply_operations(operations)?;
408
409        // Generate output
410        processor.generate_output()
411    }
412
413    /// Reset the processor for a new batch
414    pub fn reset(&mut self) {
415        self.batch_processor.reset();
416    }
417}
418
419// =============================================================================
420// Tests
421// =============================================================================
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426
427    // Mock batch processor for testing
428    struct MockBatchProcessor {
429        format: FormatKind,
430    }
431
432    impl MockBatchProcessor {
433        const fn new(format: FormatKind) -> Self {
434            Self { format }
435        }
436    }
437
438    impl FormatBatchProcessor for MockBatchProcessor {
439        fn format_kind(&self) -> FormatKind {
440            self.format
441        }
442
443        fn process_batch(
444            &mut self,
445            _data: &[u8],
446            _schema: &CompiledSchema,
447        ) -> Result<FormatBatchResult> {
448            Ok(FormatBatchResult {
449                documents: vec![r#"{"name":"test","value":42}"#.to_string()],
450                errors: vec![],
451                statistics: BatchStatistics {
452                    total_lines: 1,
453                    successful_lines: 1,
454                    failed_lines: 0,
455                    processing_time_ms: 0.1,
456                    avg_memory_per_line: 50,
457                    overall_schema_match_ratio: 1.0,
458                },
459            })
460        }
461
462        fn process_batch_unfiltered(&mut self, _data: &[u8]) -> Result<FormatBatchResult> {
463            self.process_batch(&[], &CompiledSchema::compile(&[]).unwrap())
464        }
465
466        fn reset(&mut self) {}
467    }
468
469    #[test]
470    fn test_format_dson_processor_creation() {
471        let processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
472        assert_eq!(processor.format_kind(), FormatKind::Json);
473    }
474
475    #[test]
476    fn test_batch_statistics_default() {
477        let stats = BatchStatistics::default();
478        assert_eq!(stats.total_lines, 0);
479        assert_eq!(stats.successful_lines, 0);
480        assert_eq!(stats.failed_lines, 0);
481    }
482
483    #[test]
484    fn test_format_batch_result_new() {
485        let result = FormatBatchResult::new();
486        assert!(result.documents.is_empty());
487        assert!(result.errors.is_empty());
488    }
489
490    #[test]
491    fn test_format_batch_result_with_capacity() {
492        let result = FormatBatchResult::with_capacity(100);
493        assert!(result.documents.capacity() >= 100);
494    }
495
496    #[test]
497    fn test_process_with_no_operations() {
498        let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
499        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
500
501        let result = processor
502            .process_with_operations(b"{}", &schema, &[])
503            .unwrap();
504        assert_eq!(result.documents.len(), 1);
505    }
506
507    #[test]
508    fn test_process_with_operations() {
509        use fionn_ops::OperationValue;
510
511        let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
512        let schema = CompiledSchema::compile(&["*".to_string()]).unwrap();
513
514        let operations = vec![DsonOperation::FieldAdd {
515            path: "added".to_string(),
516            value: OperationValue::StringRef("new_value".to_string()),
517        }];
518
519        let result = processor
520            .process_with_operations(b"{}", &schema, &operations)
521            .unwrap();
522        assert_eq!(result.documents.len(), 1);
523        // The document should have the added field
524        assert!(result.documents[0].contains("added"));
525    }
526
527    #[test]
528    fn test_line_error_debug() {
529        let error = LineError {
530            line_index: 0,
531            error_message: "test error".to_string(),
532            raw_line: "test".to_string(),
533        };
534        let debug = format!("{error:?}");
535        assert!(debug.contains("LineError"));
536    }
537
538    #[test]
539    fn test_batch_processor_reset() {
540        let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
541        processor.reset();
542        // Reset should not panic
543    }
544
545    #[test]
546    fn test_batch_processor_mut_access() {
547        let mut processor = FormatDsonProcessor::new(MockBatchProcessor::new(FormatKind::Json));
548        let _batch_proc = processor.batch_processor_mut();
549        // Should be able to get mutable access
550    }
551}