Skip to main content

fionn_stream/skiptape/
jsonl.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! JSONL (JSON Lines) processing for SIMD-JSONL Skip Tape
3//!
4//! This module provides SIMD-accelerated JSON Lines processing with schema-aware filtering.
5
6use crate::skiptape::SkipTapeProcessor;
7use crate::skiptape::error::{Result, SkipTapeError};
8use crate::skiptape::schema::CompiledSchema;
9use crate::skiptape::simd_ops::SimdJsonStructuralDetector;
10use fionn_simd::SimdLineSeparator;
11use fionn_simd::SimdStructuralFilter;
12use rayon::prelude::*;
13
14const GPU_MIN_BYTES: usize = 16 * 1024;
15
16/// Result type for parallel line processing - `(line_index, result)`
17type LineProcessResult = (usize, std::result::Result<String, (SkipTapeError, String)>);
18
19/// Control whether SIMD pre-scan phases use CPU or GPU.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum PreScanMode {
22    /// Always use CPU SIMD phases.
23    CpuOnly,
24    /// Use GPU pre-scan when available and input is large enough.
25    Auto,
26    /// Force GPU pre-scan (error if GPU setup fails).
27    Gpu,
28}
29
30/// Processing statistics
31#[derive(Debug, Clone)]
32pub struct ProcessingStats {
33    /// Total lines processed
34    pub total_lines: usize,
35    /// Successfully processed lines
36    pub successful_lines: usize,
37    /// Failed lines
38    pub failed_lines: usize,
39    /// Total processing time
40    pub processing_time_ms: f64,
41}
42
43/// SIMD-JSONL processor with skip parsing capabilities
44pub struct SimdJsonlProcessor {
45    /// SIMD line separator for fast line detection
46    line_separator: SimdLineSeparator,
47    /// Processing statistics
48    stats: ProcessingStats,
49}
50
51impl SimdJsonlProcessor {
52    /// Create a new SIMD-JSONL processor
53    #[must_use]
54    pub const fn new() -> Self {
55        Self {
56            line_separator: SimdLineSeparator::new(),
57            stats: ProcessingStats {
58                total_lines: 0,
59                successful_lines: 0,
60                failed_lines: 0,
61                processing_time_ms: 0.0,
62            },
63        }
64    }
65
66    /// Process JSONL data and extract individual JSON lines
67    ///
68    /// # Errors
69    /// Returns an error if processing fails
70    pub fn extract_lines(&self, jsonl_data: &[u8]) -> Result<Vec<String>> {
71        let mut lines = Vec::new();
72
73        // SIMD-accelerated line separation
74        let line_boundaries = self.line_boundaries(jsonl_data);
75
76        let mut line_start = 0;
77        for &line_end in &line_boundaries {
78            if line_end > line_start {
79                let line_data = &jsonl_data[line_start..line_end];
80
81                // Convert to string and trim whitespace
82                if let Ok(line_str) = std::str::from_utf8(line_data) {
83                    let trimmed = line_str.trim();
84                    if !trimmed.is_empty() {
85                        lines.push(trimmed.to_string());
86                    }
87                }
88            }
89            line_start = line_end;
90        }
91
92        Ok(lines)
93    }
94
95    /// Get processing statistics
96    #[must_use]
97    pub const fn stats(&self) -> &ProcessingStats {
98        &self.stats
99    }
100
101    /// Find line boundaries using SIMD-accelerated line separator
102    fn line_boundaries(&self, data: &[u8]) -> Vec<usize> {
103        self.line_separator.find_line_boundaries(data)
104    }
105}
106
107impl Default for SimdJsonlProcessor {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113/// Batch processing statistics
114#[derive(Debug, Clone, Default)]
115pub struct BatchStatistics {
116    /// Total lines processed
117    pub total_lines: usize,
118    /// Successfully processed lines
119    pub successful_lines: usize,
120    /// Failed lines
121    pub failed_lines: usize,
122    /// Total processing time
123    pub processing_time_ms: f64,
124    /// Average memory per line
125    pub avg_memory_per_line: usize,
126    /// Overall schema match ratio
127    pub overall_schema_match_ratio: f64,
128}
129
130/// Result of batch processing
131#[derive(Debug)]
132pub struct BatchResult {
133    /// Successfully processed JSON documents
134    pub documents: Vec<String>,
135    /// Errors for failed lines
136    pub errors: Vec<LineError>,
137    /// Processing statistics
138    pub statistics: BatchStatistics,
139}
140
141/// Error for a specific line
142#[derive(Debug, Clone)]
143pub struct LineError {
144    /// Index of the line in the original data
145    pub line_index: usize,
146    /// The error that occurred
147    pub error: SkipTapeError,
148    /// Raw line content
149    pub raw_line: String,
150}
151
152/// SIMD-JSONL batch processor with schema filtering
153pub struct SimdJsonlBatchProcessor {
154    /// SIMD line separator for fast line detection
155    line_separator: SimdLineSeparator,
156    /// SIMD structural filter for fast schema pre-filtering
157    structural_filter: SimdStructuralFilter,
158    /// SIMD JSON structural detector for high-performance parsing
159    structural_detector: SimdJsonStructuralDetector,
160    /// Skip tape processor for individual lines
161    skip_processor: SkipTapeProcessor,
162    /// GPU pre-scan enabled flag
163    gpu_enabled: bool,
164    /// Pre-scan mode (CPU vs GPU)
165    prescan_mode: PreScanMode,
166    /// Minimum bytes for GPU pre-scan in Auto mode
167    gpu_min_bytes: usize,
168    /// GPU scanner (optional)
169    // gpu_scanner: Option<GpuSimdScanner>,
170    /// Processing statistics
171    stats: BatchStatistics,
172}
173
174impl Default for SimdJsonlBatchProcessor {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180#[allow(clippy::unused_self, clippy::missing_const_for_fn)] // Methods may use self for future GPU state
181impl SimdJsonlBatchProcessor {
182    /// Create a new SIMD-JSONL batch processor
183    #[must_use]
184    pub fn new() -> Self {
185        Self {
186            line_separator: SimdLineSeparator::new(),
187            structural_filter: SimdStructuralFilter::new(),
188            structural_detector: SimdJsonStructuralDetector::new(),
189            skip_processor: SkipTapeProcessor::new(),
190            gpu_enabled: false,
191            prescan_mode: PreScanMode::Auto,
192            gpu_min_bytes: GPU_MIN_BYTES,
193            // gpu_scanner: None,
194            stats: BatchStatistics::default(),
195        }
196    }
197
198    /// Create a new batch processor and attempt to enable GPU pre-scan.
199    #[must_use]
200    pub fn new_with_gpu() -> Self {
201        let mut processor = Self::new();
202        let _ = processor.set_prescan_mode(PreScanMode::Gpu);
203        processor
204    }
205
206    /// Process a batch of JSONL data with schema filtering using optimized approach
207    ///
208    /// Key optimizations:
209    /// - Reuse a single processor instance instead of creating new ones per line
210    /// - Skip serialization in benchmarks for raw performance measurement
211    ///
212    /// # Errors
213    /// Returns an error if batch processing fails
214    pub fn process_batch_optimized(
215        &mut self,
216        jsonl_data: &[u8],
217        schema: &CompiledSchema,
218    ) -> Result<BatchResult> {
219        let start_time = std::time::Instant::now();
220
221        // Reset statistics
222        self.stats = BatchStatistics {
223            total_lines: 0,
224            successful_lines: 0,
225            failed_lines: 0,
226            processing_time_ms: 0.0,
227            avg_memory_per_line: 0,
228            overall_schema_match_ratio: 0.0,
229        };
230
231        // SIMD-accelerated line separation
232        let line_boundaries = self.line_boundaries(jsonl_data);
233        self.stats.total_lines = line_boundaries.len();
234
235        #[allow(unused_mut)] // Mutated conditionally in GPU path
236        let mut documents = Vec::new();
237        let mut errors = Vec::new();
238        let mut total_schema_match_ratio = 0.0;
239        let mut total_memory = 0;
240
241        // Collect valid lines first
242        let mut valid_lines = Vec::new();
243        let mut line_start = 0;
244        for (line_index, &line_end) in line_boundaries.iter().enumerate() {
245            let line_data = &jsonl_data[line_start..line_end];
246            line_start = line_end;
247
248            // Convert bytes to string (assuming valid UTF-8 for JSON)
249            if let Ok(s) = std::str::from_utf8(line_data) {
250                let trimmed = s.trim();
251                if !trimmed.is_empty() {
252                    valid_lines.push((line_index, trimmed.to_string()));
253                }
254            } else {
255                errors.push(LineError {
256                    line_index,
257                    error: SkipTapeError::ParseError("Invalid UTF-8".to_string()),
258                    raw_line: String::from_utf8_lossy(line_data).to_string(),
259                });
260                self.stats.failed_lines += 1;
261            }
262        }
263
264        // Process valid lines with reused processor (optimization: single processor instance)
265        for (line_index, line_str) in valid_lines {
266            match self.skip_processor.process_line(&line_str, schema) {
267                Ok(skip_tape) => {
268                    total_schema_match_ratio += skip_tape.metadata().schema_match_ratio;
269                    total_memory += skip_tape.memory_efficiency().bytes_used;
270
271                    // Add the original document to results
272                    documents.push(line_str);
273                    self.stats.successful_lines += 1;
274                }
275                Err(error) => {
276                    errors.push(LineError {
277                        line_index,
278                        error,
279                        raw_line: line_str,
280                    });
281                    self.stats.failed_lines += 1;
282                }
283            }
284        }
285
286        // Calculate final statistics
287        let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
288        self.stats.processing_time_ms = processing_time;
289
290        if self.stats.successful_lines > 0 {
291            self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
292            let successful_f64 =
293                f64::from(u32::try_from(self.stats.successful_lines).unwrap_or(u32::MAX));
294            self.stats.overall_schema_match_ratio = total_schema_match_ratio / successful_f64;
295        }
296
297        Ok(BatchResult {
298            documents,
299            errors,
300            statistics: self.stats.clone(),
301        })
302    }
303
304    /// Process a batch with SIMD-DSONL: SIMD-JSONL parsing + SIMD-DSON operations
305    /// This combines high-performance JSONL parsing with DSON operation processing.
306    ///
307    /// # Errors
308    /// Returns an error if batch processing fails
309    ///
310    /// # Panics
311    /// Panics if the document vector becomes empty unexpectedly (should not happen in normal use)
312    pub fn process_batch_simd_dsonl(
313        &mut self,
314        jsonl_data: &[u8],
315        dson_operations: &[fionn_ops::DsonOperation],
316    ) -> Result<BatchResult> {
317        let start_time = std::time::Instant::now();
318
319        // Reset statistics
320        self.stats = BatchStatistics {
321            total_lines: 0,
322            successful_lines: 0,
323            failed_lines: 0,
324            processing_time_ms: 0.0,
325            avg_memory_per_line: 0,
326            overall_schema_match_ratio: 1.0,
327        };
328
329        // Step 1: SIMD-JSONL parsing (fast batch processing)
330        let parse_result = self.process_batch_raw_simd(jsonl_data)?;
331
332        // Step 2: Apply SIMD-DSON operations to each parsed document
333        let mut processed_documents = Vec::new();
334        let mut total_memory = 0;
335        let mut operation_errors = Vec::new();
336
337        for (line_index, doc_json) in parse_result.documents.iter().enumerate() {
338            match Self::apply_dson_operations_to_document(doc_json, dson_operations) {
339                Ok(transformed_doc) => {
340                    total_memory += transformed_doc.len();
341                    processed_documents.push(transformed_doc);
342                    self.stats.successful_lines += 1;
343                }
344                Err(error) => {
345                    operation_errors.push(LineError {
346                        line_index,
347                        error,
348                        raw_line: doc_json.clone(),
349                    });
350                    self.stats.failed_lines += 1;
351                    // Still include the original document on operation failure
352                    processed_documents.push(doc_json.clone());
353                    total_memory += doc_json.len();
354                }
355            }
356        }
357
358        // Combine parsing errors with operation errors
359        let mut all_errors = parse_result.errors;
360        all_errors.extend(operation_errors);
361
362        // Update statistics
363        let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
364        self.stats.processing_time_ms = processing_time;
365        self.stats.total_lines = parse_result.documents.len();
366
367        if self.stats.successful_lines > 0 {
368            self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
369        }
370
371        Ok(BatchResult {
372            documents: processed_documents,
373            errors: all_errors,
374            statistics: self.stats.clone(),
375        })
376    }
377
378    /// Process a batch with raw SIMD parsing (no schema filtering)
379    /// This eliminates all schema overhead for maximum performance.
380    ///
381    /// # Errors
382    /// Returns an error if batch processing fails
383    pub fn process_batch_raw_simd(&mut self, jsonl_data: &[u8]) -> Result<BatchResult> {
384        let start_time = std::time::Instant::now();
385
386        // Reset statistics
387        self.stats = BatchStatistics {
388            total_lines: 0,
389            successful_lines: 0,
390            failed_lines: 0,
391            processing_time_ms: 0.0,
392            avg_memory_per_line: 0,
393            overall_schema_match_ratio: 1.0, // All data included
394        };
395
396        // SIMD-accelerated line separation
397        let line_boundaries = self.line_boundaries(jsonl_data);
398        self.stats.total_lines = line_boundaries.len();
399
400        #[allow(unused_mut)] // Mutated conditionally in GPU path
401        let mut documents = Vec::new();
402        let mut errors = Vec::new();
403        let mut total_memory = 0;
404
405        // Collect all valid lines first
406        let mut valid_lines = Vec::new();
407        let mut line_start = 0;
408        for (line_index, &line_end) in line_boundaries.iter().enumerate() {
409            let line_data = &jsonl_data[line_start..line_end];
410            line_start = line_end;
411
412            // Convert bytes to string (assuming valid UTF-8 for JSON)
413            if let Ok(s) = std::str::from_utf8(line_data) {
414                let trimmed = s.trim();
415                if !trimmed.is_empty() {
416                    valid_lines.push((line_index, trimmed.to_string()));
417                }
418            } else {
419                errors.push(LineError {
420                    line_index,
421                    error: SkipTapeError::ParseError("Invalid UTF-8".to_string()),
422                    raw_line: String::from_utf8_lossy(line_data).to_string(),
423                });
424                self.stats.failed_lines += 1;
425            }
426        }
427
428        // Process all lines in parallel using Rayon
429        let results: Vec<LineProcessResult> = valid_lines
430            .into_par_iter()
431            .map(|(line_index, line_str)| {
432                // Create a temporary processor for each thread
433                let mut temp_processor = Self::new();
434                let result = match temp_processor.parse_json_raw(&line_str) {
435                    Ok(json_str) => Ok(json_str),
436                    Err(e) => Err((e, line_str)),
437                };
438                (line_index, result)
439            })
440            .collect();
441
442        // Collect results sequentially
443        for (line_index, result) in results {
444            match result {
445                Ok(json_str) => {
446                    total_memory += json_str.len();
447                    documents.push(json_str);
448                    self.stats.successful_lines += 1;
449                }
450                Err((error, raw_line)) => {
451                    errors.push(LineError {
452                        line_index,
453                        error,
454                        raw_line,
455                    });
456                    self.stats.failed_lines += 1;
457                }
458            }
459        }
460
461        // Calculate final statistics
462        let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
463        self.stats.processing_time_ms = processing_time;
464
465        if self.stats.successful_lines > 0 {
466            self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
467        }
468
469        Ok(BatchResult {
470            documents,
471            errors,
472            statistics: self.stats.clone(),
473        })
474    }
475
476    /// Apply DSON operations to a single JSON document
477    fn apply_dson_operations_to_document(
478        doc_json: &str,
479        operations: &[fionn_ops::DsonOperation],
480    ) -> Result<String> {
481        // Create a processor for this document
482        let mut processor = fionn_ops::processor::BlackBoxProcessor::new(vec![], vec![]);
483
484        // Process the document through the DSON pipeline
485        processor.process(doc_json)?;
486
487        // Apply operations to the processed document
488        processor.apply_operations(operations)?;
489
490        // Generate the final output
491        Ok(processor.generate_output()?)
492    }
493
494    /// Raw JSON parsing without schema filtering - simplified for performance
495    /// Raw JSON parsing with SIMD validation
496    ///
497    /// # Errors
498    /// Returns an error if JSON parsing or validation fails
499    pub fn parse_json_raw(&mut self, json: &str) -> Result<String> {
500        let bytes = json.as_bytes();
501        if bytes.is_empty() {
502            return Err(SkipTapeError::ParseError("Empty JSON".to_string()));
503        }
504
505        // Use SIMD structural detector for high-performance validation
506        // This validates the structure (braces, brackets, etc.) using AVX2
507        let structural_positions = self.structural_positions(bytes);
508
509        // Check validity using the structural positions
510        Self::validate_json_with_structural_positions(bytes, &structural_positions)?;
511
512        Ok(json.to_string())
513    }
514
515    /// Fast SIMD-based JSON structure validation using pre-computed structural positions
516    fn validate_json_with_structural_positions(
517        bytes: &[u8],
518        structural_positions: &[usize],
519    ) -> Result<()> {
520        let mut depth = 0;
521        let mut in_string = false;
522        let mut escaped = false;
523
524        for &pos in structural_positions {
525            if pos >= bytes.len() {
526                continue;
527            }
528
529            let byte = bytes[pos];
530
531            // Handle string state
532            if in_string {
533                if escaped {
534                    escaped = false;
535                } else if byte == b'\\' {
536                    escaped = true;
537                } else if byte == b'"' {
538                    in_string = false;
539                }
540                continue;
541            }
542
543            // Handle structural characters
544            match byte {
545                b'{' | b'[' => {
546                    depth += 1;
547                }
548                b'}' | b']' => {
549                    depth -= 1;
550                    if depth < 0 {
551                        return Err(SkipTapeError::ParseError(
552                            "Unmatched closing bracket".to_string(),
553                        ));
554                    }
555                }
556                b'"' => {
557                    in_string = true;
558                }
559                // Valid structural/whitespace and literal/number characters
560                b','
561                | b':'
562                | b' '
563                | b'\t'
564                | b'\n'
565                | b'\r'
566                | b't'
567                | b'f'
568                | b'n'
569                | b'0'..=b'9'
570                | b'-'
571                | b'.'
572                | b'+'
573                | b'e'
574                | b'E' => {}
575                _ => {
576                    return Err(SkipTapeError::ParseError(format!(
577                        "Invalid character: {}",
578                        byte as char
579                    )));
580                }
581            }
582        }
583
584        if depth != 0 {
585            return Err(SkipTapeError::ParseError("Unmatched brackets".to_string()));
586        }
587
588        Ok(())
589    }
590
591    /// Process a batch with structural filtering - ultra-fast pre-filtering
592    /// This approach uses SIMD to quickly identify documents that match the schema
593    /// before doing any parsing, potentially skipping 80-90% of documents.
594    ///
595    /// # Errors
596    /// Returns an error if batch processing fails
597    #[allow(clippy::too_many_lines)] // Complex batch processing with filtering and statistics
598    pub fn process_batch_structural_filtering(
599        &mut self,
600        jsonl_data: &[u8],
601        schema: &CompiledSchema,
602    ) -> Result<BatchResult> {
603        let start_time = std::time::Instant::now();
604
605        // Reset statistics
606        self.stats = BatchStatistics {
607            total_lines: 0,
608            successful_lines: 0,
609            failed_lines: 0,
610            processing_time_ms: 0.0,
611            avg_memory_per_line: 0,
612            overall_schema_match_ratio: 0.0,
613        };
614
615        // SIMD-accelerated line separation
616        let line_boundaries = self.line_boundaries(jsonl_data);
617        self.stats.total_lines = line_boundaries.len();
618
619        #[allow(unused_mut)] // Mutated conditionally in GPU path
620        let mut documents = Vec::new();
621        let mut errors = Vec::new();
622        let mut total_schema_match_ratio = 0.0;
623        let mut total_memory = 0;
624        let mut prefiltered_count = 0;
625
626        // Apply structural filtering - this is the key optimization!
627        let required_fields: Vec<String> = schema
628            .field_paths()
629            .iter()
630            .filter(|path| {
631                path.starts_with("user.") || path == &"age" || path == &"active" || path == &"score"
632            })
633            .cloned()
634            .collect();
635
636        let gpu_prefilter_flags =
637            self.gpu_prefilter_lines(jsonl_data, &line_boundaries, &required_fields);
638        if gpu_prefilter_flags.is_some() {
639            prefiltered_count = line_boundaries.len();
640        }
641
642        // Collect valid lines first
643        let mut valid_lines = Vec::new();
644        let mut line_start = 0;
645        for (line_index, &line_end) in line_boundaries.iter().enumerate() {
646            let line_data = &jsonl_data[line_start..line_end];
647            line_start = line_end;
648
649            if let Some(flags) = &gpu_prefilter_flags
650                && !flags.get(line_index).copied().unwrap_or(false)
651            {
652                continue;
653            }
654
655            // Convert bytes to string (assuming valid UTF-8 for JSON)
656            if let Ok(s) = std::str::from_utf8(line_data) {
657                let trimmed = s.trim();
658                if !trimmed.is_empty() {
659                    valid_lines.push((line_index, trimmed.to_string()));
660                    if gpu_prefilter_flags.is_none() {
661                        prefiltered_count += 1;
662                    }
663                }
664            } else {
665                errors.push(LineError {
666                    line_index,
667                    error: SkipTapeError::ParseError("Invalid UTF-8".to_string()),
668                    raw_line: String::from_utf8_lossy(line_data).to_string(),
669                });
670                self.stats.failed_lines += 1;
671            }
672        }
673
674        let mut filtered_lines = Vec::new();
675        for (line_index, line_str) in valid_lines {
676            // Ultra-fast structural pre-filtering using SIMD
677            if self
678                .structural_filter
679                .matches_schema(line_str.as_bytes(), &required_fields)
680            {
681                filtered_lines.push((line_index, line_str));
682            }
683            // Documents that don't match are completely skipped - no parsing overhead!
684        }
685
686        let filtered_count = filtered_lines.len();
687
688        // Only process documents that passed structural filtering
689        for (line_index, line_str) in filtered_lines {
690            match self.skip_processor.process_line(&line_str, schema) {
691                Ok(skip_tape) => {
692                    total_schema_match_ratio += skip_tape.metadata().schema_match_ratio;
693                    total_memory += skip_tape.memory_efficiency().bytes_used;
694
695                    // Serialize the skip tape to JSON
696                    self.stats.successful_lines += 1;
697                }
698                Err(error) => {
699                    errors.push(LineError {
700                        line_index,
701                        error,
702                        raw_line: line_str,
703                    });
704                    self.stats.failed_lines += 1;
705                }
706            }
707        }
708
709        // Calculate final statistics
710        let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
711        self.stats.processing_time_ms = processing_time;
712
713        if self.stats.successful_lines > 0 {
714            self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
715            let successful_f64 =
716                f64::from(u32::try_from(self.stats.successful_lines).unwrap_or(u32::MAX));
717            self.stats.overall_schema_match_ratio = total_schema_match_ratio / successful_f64;
718        }
719
720        // Add structural filtering stats
721        let filtered_f64 = f64::from(u32::try_from(filtered_count).unwrap_or(u32::MAX));
722        let prefiltered_f64 = f64::from(u32::try_from(prefiltered_count).unwrap_or(u32::MAX));
723        println!(
724            "Structural filtering: {filtered_count}/{prefiltered_count} documents passed pre-filter ({:.1}%)",
725            (filtered_f64 / prefiltered_f64) * 100.0
726        );
727
728        Ok(BatchResult {
729            documents,
730            errors,
731            statistics: self.stats.clone(),
732        })
733    }
734
735    /// Get current processing statistics
736    #[must_use]
737    pub const fn statistics(&self) -> &BatchStatistics {
738        &self.stats
739    }
740
741    /// Reset the processor for a new batch
742    pub fn reset(&mut self) {
743        // Reset the skip processor's arena for the next batch
744        self.skip_processor.reset();
745        self.stats = BatchStatistics {
746            total_lines: 0,
747            successful_lines: 0,
748            failed_lines: 0,
749            processing_time_ms: 0.0,
750            avg_memory_per_line: 0,
751            overall_schema_match_ratio: 0.0,
752        };
753    }
754
755    /// Enable GPU pre-scan for SIMD phases (line boundary + structural detection).
756    ///
757    /// Returns true when the GPU scanner is available and active.
758    ///
759    /// # Errors
760    /// Returns an error if GPU initialization fails
761    pub fn enable_gpu(&mut self) -> Result<bool> {
762        self.set_prescan_mode(PreScanMode::Gpu)
763    }
764
765    /// Disable GPU pre-scan and fall back to CPU SIMD.
766    pub fn disable_gpu(&mut self) {
767        let _ = self.set_prescan_mode(PreScanMode::CpuOnly);
768    }
769
770    /// Set the pre-scan mode (CPU vs GPU).
771    ///
772    /// # Errors
773    /// Returns an error if GPU mode is requested but initialization fails
774    pub fn set_prescan_mode(&mut self, mode: PreScanMode) -> Result<bool> {
775        self.prescan_mode = mode;
776        match mode {
777            PreScanMode::CpuOnly => {
778                self.gpu_enabled = false;
779                Ok(false)
780            }
781            PreScanMode::Auto => Ok(self.gpu_enabled),
782            PreScanMode::Gpu => Ok(self.try_enable_gpu()),
783        }
784    }
785
786    /// Current pre-scan mode.
787    #[must_use]
788    pub const fn prescan_mode(&self) -> PreScanMode {
789        self.prescan_mode
790    }
791
792    /// Override the GPU size cutoff for Auto mode.
793    pub const fn set_gpu_min_bytes(&mut self, min_bytes: usize) {
794        self.gpu_min_bytes = min_bytes;
795    }
796
797    #[allow(clippy::needless_pass_by_ref_mut)] // Will mutate GPU state in future
798    fn line_boundaries(&mut self, data: &[u8]) -> Vec<usize> {
799        if let Some(boundaries) = self.gpu_line_boundaries(data) {
800            return boundaries;
801        }
802        self.line_separator.find_line_boundaries(data)
803    }
804
805    #[allow(clippy::needless_pass_by_ref_mut)] // Will mutate GPU state in future
806    fn structural_positions(&mut self, data: &[u8]) -> Vec<usize> {
807        if let Some(positions) = self.gpu_structural_positions(data) {
808            return positions;
809        }
810        self.structural_detector.find_structural_characters(data)
811    }
812
813    #[allow(clippy::unnecessary_wraps)] // Returns Result in GPU-enabled builds
814    fn try_enable_gpu(&self) -> bool {
815        false
816    }
817
818    #[allow(dead_code)] // Reserved for future GPU acceleration threshold
819    const fn gpu_allowed(&self, data: &[u8]) -> bool {
820        data.len() >= 64
821    }
822
823    fn gpu_line_boundaries(&self, _data: &[u8]) -> Option<Vec<usize>> {
824        None
825    }
826
827    fn gpu_structural_positions(&self, _data: &[u8]) -> Option<Vec<usize>> {
828        None
829    }
830
831    fn gpu_prefilter_lines(
832        &self,
833        _data: &[u8],
834        _line_boundaries: &[usize],
835        _required_fields: &[String],
836    ) -> Option<Vec<bool>> {
837        None
838    }
839}