1use crate::skiptape::SkipTapeProcessor;
7use crate::skiptape::error::{Result, SkipTapeError};
8use crate::skiptape::schema::CompiledSchema;
9use crate::skiptape::simd_ops::SimdJsonStructuralDetector;
10use fionn_simd::SimdLineSeparator;
11use fionn_simd::SimdStructuralFilter;
12use rayon::prelude::*;
13
14const GPU_MIN_BYTES: usize = 16 * 1024;
15
16type LineProcessResult = (usize, std::result::Result<String, (SkipTapeError, String)>);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum PreScanMode {
22 CpuOnly,
24 Auto,
26 Gpu,
28}
29
30#[derive(Debug, Clone)]
32pub struct ProcessingStats {
33 pub total_lines: usize,
35 pub successful_lines: usize,
37 pub failed_lines: usize,
39 pub processing_time_ms: f64,
41}
42
43pub struct SimdJsonlProcessor {
45 line_separator: SimdLineSeparator,
47 stats: ProcessingStats,
49}
50
51impl SimdJsonlProcessor {
52 #[must_use]
54 pub const fn new() -> Self {
55 Self {
56 line_separator: SimdLineSeparator::new(),
57 stats: ProcessingStats {
58 total_lines: 0,
59 successful_lines: 0,
60 failed_lines: 0,
61 processing_time_ms: 0.0,
62 },
63 }
64 }
65
66 pub fn extract_lines(&self, jsonl_data: &[u8]) -> Result<Vec<String>> {
71 let mut lines = Vec::new();
72
73 let line_boundaries = self.line_boundaries(jsonl_data);
75
76 let mut line_start = 0;
77 for &line_end in &line_boundaries {
78 if line_end > line_start {
79 let line_data = &jsonl_data[line_start..line_end];
80
81 if let Ok(line_str) = std::str::from_utf8(line_data) {
83 let trimmed = line_str.trim();
84 if !trimmed.is_empty() {
85 lines.push(trimmed.to_string());
86 }
87 }
88 }
89 line_start = line_end;
90 }
91
92 Ok(lines)
93 }
94
95 #[must_use]
97 pub const fn stats(&self) -> &ProcessingStats {
98 &self.stats
99 }
100
101 fn line_boundaries(&self, data: &[u8]) -> Vec<usize> {
103 self.line_separator.find_line_boundaries(data)
104 }
105}
106
107impl Default for SimdJsonlProcessor {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113#[derive(Debug, Clone, Default)]
115pub struct BatchStatistics {
116 pub total_lines: usize,
118 pub successful_lines: usize,
120 pub failed_lines: usize,
122 pub processing_time_ms: f64,
124 pub avg_memory_per_line: usize,
126 pub overall_schema_match_ratio: f64,
128}
129
130#[derive(Debug)]
132pub struct BatchResult {
133 pub documents: Vec<String>,
135 pub errors: Vec<LineError>,
137 pub statistics: BatchStatistics,
139}
140
141#[derive(Debug, Clone)]
143pub struct LineError {
144 pub line_index: usize,
146 pub error: SkipTapeError,
148 pub raw_line: String,
150}
151
152pub struct SimdJsonlBatchProcessor {
154 line_separator: SimdLineSeparator,
156 structural_filter: SimdStructuralFilter,
158 structural_detector: SimdJsonStructuralDetector,
160 skip_processor: SkipTapeProcessor,
162 gpu_enabled: bool,
164 prescan_mode: PreScanMode,
166 gpu_min_bytes: usize,
168 stats: BatchStatistics,
172}
173
174impl Default for SimdJsonlBatchProcessor {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180#[allow(clippy::unused_self, clippy::missing_const_for_fn)] impl SimdJsonlBatchProcessor {
182 #[must_use]
184 pub fn new() -> Self {
185 Self {
186 line_separator: SimdLineSeparator::new(),
187 structural_filter: SimdStructuralFilter::new(),
188 structural_detector: SimdJsonStructuralDetector::new(),
189 skip_processor: SkipTapeProcessor::new(),
190 gpu_enabled: false,
191 prescan_mode: PreScanMode::Auto,
192 gpu_min_bytes: GPU_MIN_BYTES,
193 stats: BatchStatistics::default(),
195 }
196 }
197
198 #[must_use]
200 pub fn new_with_gpu() -> Self {
201 let mut processor = Self::new();
202 let _ = processor.set_prescan_mode(PreScanMode::Gpu);
203 processor
204 }
205
206 pub fn process_batch_optimized(
215 &mut self,
216 jsonl_data: &[u8],
217 schema: &CompiledSchema,
218 ) -> Result<BatchResult> {
219 let start_time = std::time::Instant::now();
220
221 self.stats = BatchStatistics {
223 total_lines: 0,
224 successful_lines: 0,
225 failed_lines: 0,
226 processing_time_ms: 0.0,
227 avg_memory_per_line: 0,
228 overall_schema_match_ratio: 0.0,
229 };
230
231 let line_boundaries = self.line_boundaries(jsonl_data);
233 self.stats.total_lines = line_boundaries.len();
234
235 #[allow(unused_mut)] let mut documents = Vec::new();
237 let mut errors = Vec::new();
238 let mut total_schema_match_ratio = 0.0;
239 let mut total_memory = 0;
240
241 let mut valid_lines = Vec::new();
243 let mut line_start = 0;
244 for (line_index, &line_end) in line_boundaries.iter().enumerate() {
245 let line_data = &jsonl_data[line_start..line_end];
246 line_start = line_end;
247
248 if let Ok(s) = std::str::from_utf8(line_data) {
250 let trimmed = s.trim();
251 if !trimmed.is_empty() {
252 valid_lines.push((line_index, trimmed.to_string()));
253 }
254 } else {
255 errors.push(LineError {
256 line_index,
257 error: SkipTapeError::ParseError("Invalid UTF-8".to_string()),
258 raw_line: String::from_utf8_lossy(line_data).to_string(),
259 });
260 self.stats.failed_lines += 1;
261 }
262 }
263
264 for (line_index, line_str) in valid_lines {
266 match self.skip_processor.process_line(&line_str, schema) {
267 Ok(skip_tape) => {
268 total_schema_match_ratio += skip_tape.metadata().schema_match_ratio;
269 total_memory += skip_tape.memory_efficiency().bytes_used;
270
271 documents.push(line_str);
273 self.stats.successful_lines += 1;
274 }
275 Err(error) => {
276 errors.push(LineError {
277 line_index,
278 error,
279 raw_line: line_str,
280 });
281 self.stats.failed_lines += 1;
282 }
283 }
284 }
285
286 let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
288 self.stats.processing_time_ms = processing_time;
289
290 if self.stats.successful_lines > 0 {
291 self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
292 let successful_f64 =
293 f64::from(u32::try_from(self.stats.successful_lines).unwrap_or(u32::MAX));
294 self.stats.overall_schema_match_ratio = total_schema_match_ratio / successful_f64;
295 }
296
297 Ok(BatchResult {
298 documents,
299 errors,
300 statistics: self.stats.clone(),
301 })
302 }
303
304 pub fn process_batch_simd_dsonl(
313 &mut self,
314 jsonl_data: &[u8],
315 dson_operations: &[fionn_ops::DsonOperation],
316 ) -> Result<BatchResult> {
317 let start_time = std::time::Instant::now();
318
319 self.stats = BatchStatistics {
321 total_lines: 0,
322 successful_lines: 0,
323 failed_lines: 0,
324 processing_time_ms: 0.0,
325 avg_memory_per_line: 0,
326 overall_schema_match_ratio: 1.0,
327 };
328
329 let parse_result = self.process_batch_raw_simd(jsonl_data)?;
331
332 let mut processed_documents = Vec::new();
334 let mut total_memory = 0;
335 let mut operation_errors = Vec::new();
336
337 for (line_index, doc_json) in parse_result.documents.iter().enumerate() {
338 match Self::apply_dson_operations_to_document(doc_json, dson_operations) {
339 Ok(transformed_doc) => {
340 total_memory += transformed_doc.len();
341 processed_documents.push(transformed_doc);
342 self.stats.successful_lines += 1;
343 }
344 Err(error) => {
345 operation_errors.push(LineError {
346 line_index,
347 error,
348 raw_line: doc_json.clone(),
349 });
350 self.stats.failed_lines += 1;
351 processed_documents.push(doc_json.clone());
353 total_memory += doc_json.len();
354 }
355 }
356 }
357
358 let mut all_errors = parse_result.errors;
360 all_errors.extend(operation_errors);
361
362 let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
364 self.stats.processing_time_ms = processing_time;
365 self.stats.total_lines = parse_result.documents.len();
366
367 if self.stats.successful_lines > 0 {
368 self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
369 }
370
371 Ok(BatchResult {
372 documents: processed_documents,
373 errors: all_errors,
374 statistics: self.stats.clone(),
375 })
376 }
377
378 pub fn process_batch_raw_simd(&mut self, jsonl_data: &[u8]) -> Result<BatchResult> {
384 let start_time = std::time::Instant::now();
385
386 self.stats = BatchStatistics {
388 total_lines: 0,
389 successful_lines: 0,
390 failed_lines: 0,
391 processing_time_ms: 0.0,
392 avg_memory_per_line: 0,
393 overall_schema_match_ratio: 1.0, };
395
396 let line_boundaries = self.line_boundaries(jsonl_data);
398 self.stats.total_lines = line_boundaries.len();
399
400 #[allow(unused_mut)] let mut documents = Vec::new();
402 let mut errors = Vec::new();
403 let mut total_memory = 0;
404
405 let mut valid_lines = Vec::new();
407 let mut line_start = 0;
408 for (line_index, &line_end) in line_boundaries.iter().enumerate() {
409 let line_data = &jsonl_data[line_start..line_end];
410 line_start = line_end;
411
412 if let Ok(s) = std::str::from_utf8(line_data) {
414 let trimmed = s.trim();
415 if !trimmed.is_empty() {
416 valid_lines.push((line_index, trimmed.to_string()));
417 }
418 } else {
419 errors.push(LineError {
420 line_index,
421 error: SkipTapeError::ParseError("Invalid UTF-8".to_string()),
422 raw_line: String::from_utf8_lossy(line_data).to_string(),
423 });
424 self.stats.failed_lines += 1;
425 }
426 }
427
428 let results: Vec<LineProcessResult> = valid_lines
430 .into_par_iter()
431 .map(|(line_index, line_str)| {
432 let mut temp_processor = Self::new();
434 let result = match temp_processor.parse_json_raw(&line_str) {
435 Ok(json_str) => Ok(json_str),
436 Err(e) => Err((e, line_str)),
437 };
438 (line_index, result)
439 })
440 .collect();
441
442 for (line_index, result) in results {
444 match result {
445 Ok(json_str) => {
446 total_memory += json_str.len();
447 documents.push(json_str);
448 self.stats.successful_lines += 1;
449 }
450 Err((error, raw_line)) => {
451 errors.push(LineError {
452 line_index,
453 error,
454 raw_line,
455 });
456 self.stats.failed_lines += 1;
457 }
458 }
459 }
460
461 let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
463 self.stats.processing_time_ms = processing_time;
464
465 if self.stats.successful_lines > 0 {
466 self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
467 }
468
469 Ok(BatchResult {
470 documents,
471 errors,
472 statistics: self.stats.clone(),
473 })
474 }
475
476 fn apply_dson_operations_to_document(
478 doc_json: &str,
479 operations: &[fionn_ops::DsonOperation],
480 ) -> Result<String> {
481 let mut processor = fionn_ops::processor::BlackBoxProcessor::new(vec![], vec![]);
483
484 processor.process(doc_json)?;
486
487 processor.apply_operations(operations)?;
489
490 Ok(processor.generate_output()?)
492 }
493
494 pub fn parse_json_raw(&mut self, json: &str) -> Result<String> {
500 let bytes = json.as_bytes();
501 if bytes.is_empty() {
502 return Err(SkipTapeError::ParseError("Empty JSON".to_string()));
503 }
504
505 let structural_positions = self.structural_positions(bytes);
508
509 Self::validate_json_with_structural_positions(bytes, &structural_positions)?;
511
512 Ok(json.to_string())
513 }
514
515 fn validate_json_with_structural_positions(
517 bytes: &[u8],
518 structural_positions: &[usize],
519 ) -> Result<()> {
520 let mut depth = 0;
521 let mut in_string = false;
522 let mut escaped = false;
523
524 for &pos in structural_positions {
525 if pos >= bytes.len() {
526 continue;
527 }
528
529 let byte = bytes[pos];
530
531 if in_string {
533 if escaped {
534 escaped = false;
535 } else if byte == b'\\' {
536 escaped = true;
537 } else if byte == b'"' {
538 in_string = false;
539 }
540 continue;
541 }
542
543 match byte {
545 b'{' | b'[' => {
546 depth += 1;
547 }
548 b'}' | b']' => {
549 depth -= 1;
550 if depth < 0 {
551 return Err(SkipTapeError::ParseError(
552 "Unmatched closing bracket".to_string(),
553 ));
554 }
555 }
556 b'"' => {
557 in_string = true;
558 }
559 b','
561 | b':'
562 | b' '
563 | b'\t'
564 | b'\n'
565 | b'\r'
566 | b't'
567 | b'f'
568 | b'n'
569 | b'0'..=b'9'
570 | b'-'
571 | b'.'
572 | b'+'
573 | b'e'
574 | b'E' => {}
575 _ => {
576 return Err(SkipTapeError::ParseError(format!(
577 "Invalid character: {}",
578 byte as char
579 )));
580 }
581 }
582 }
583
584 if depth != 0 {
585 return Err(SkipTapeError::ParseError("Unmatched brackets".to_string()));
586 }
587
588 Ok(())
589 }
590
591 #[allow(clippy::too_many_lines)] pub fn process_batch_structural_filtering(
599 &mut self,
600 jsonl_data: &[u8],
601 schema: &CompiledSchema,
602 ) -> Result<BatchResult> {
603 let start_time = std::time::Instant::now();
604
605 self.stats = BatchStatistics {
607 total_lines: 0,
608 successful_lines: 0,
609 failed_lines: 0,
610 processing_time_ms: 0.0,
611 avg_memory_per_line: 0,
612 overall_schema_match_ratio: 0.0,
613 };
614
615 let line_boundaries = self.line_boundaries(jsonl_data);
617 self.stats.total_lines = line_boundaries.len();
618
619 #[allow(unused_mut)] let mut documents = Vec::new();
621 let mut errors = Vec::new();
622 let mut total_schema_match_ratio = 0.0;
623 let mut total_memory = 0;
624 let mut prefiltered_count = 0;
625
626 let required_fields: Vec<String> = schema
628 .field_paths()
629 .iter()
630 .filter(|path| {
631 path.starts_with("user.") || path == &"age" || path == &"active" || path == &"score"
632 })
633 .cloned()
634 .collect();
635
636 let gpu_prefilter_flags =
637 self.gpu_prefilter_lines(jsonl_data, &line_boundaries, &required_fields);
638 if gpu_prefilter_flags.is_some() {
639 prefiltered_count = line_boundaries.len();
640 }
641
642 let mut valid_lines = Vec::new();
644 let mut line_start = 0;
645 for (line_index, &line_end) in line_boundaries.iter().enumerate() {
646 let line_data = &jsonl_data[line_start..line_end];
647 line_start = line_end;
648
649 if let Some(flags) = &gpu_prefilter_flags
650 && !flags.get(line_index).copied().unwrap_or(false)
651 {
652 continue;
653 }
654
655 if let Ok(s) = std::str::from_utf8(line_data) {
657 let trimmed = s.trim();
658 if !trimmed.is_empty() {
659 valid_lines.push((line_index, trimmed.to_string()));
660 if gpu_prefilter_flags.is_none() {
661 prefiltered_count += 1;
662 }
663 }
664 } else {
665 errors.push(LineError {
666 line_index,
667 error: SkipTapeError::ParseError("Invalid UTF-8".to_string()),
668 raw_line: String::from_utf8_lossy(line_data).to_string(),
669 });
670 self.stats.failed_lines += 1;
671 }
672 }
673
674 let mut filtered_lines = Vec::new();
675 for (line_index, line_str) in valid_lines {
676 if self
678 .structural_filter
679 .matches_schema(line_str.as_bytes(), &required_fields)
680 {
681 filtered_lines.push((line_index, line_str));
682 }
683 }
685
686 let filtered_count = filtered_lines.len();
687
688 for (line_index, line_str) in filtered_lines {
690 match self.skip_processor.process_line(&line_str, schema) {
691 Ok(skip_tape) => {
692 total_schema_match_ratio += skip_tape.metadata().schema_match_ratio;
693 total_memory += skip_tape.memory_efficiency().bytes_used;
694
695 self.stats.successful_lines += 1;
697 }
698 Err(error) => {
699 errors.push(LineError {
700 line_index,
701 error,
702 raw_line: line_str,
703 });
704 self.stats.failed_lines += 1;
705 }
706 }
707 }
708
709 let processing_time = start_time.elapsed().as_secs_f64() * 1000.0;
711 self.stats.processing_time_ms = processing_time;
712
713 if self.stats.successful_lines > 0 {
714 self.stats.avg_memory_per_line = total_memory / self.stats.successful_lines;
715 let successful_f64 =
716 f64::from(u32::try_from(self.stats.successful_lines).unwrap_or(u32::MAX));
717 self.stats.overall_schema_match_ratio = total_schema_match_ratio / successful_f64;
718 }
719
720 let filtered_f64 = f64::from(u32::try_from(filtered_count).unwrap_or(u32::MAX));
722 let prefiltered_f64 = f64::from(u32::try_from(prefiltered_count).unwrap_or(u32::MAX));
723 println!(
724 "Structural filtering: {filtered_count}/{prefiltered_count} documents passed pre-filter ({:.1}%)",
725 (filtered_f64 / prefiltered_f64) * 100.0
726 );
727
728 Ok(BatchResult {
729 documents,
730 errors,
731 statistics: self.stats.clone(),
732 })
733 }
734
735 #[must_use]
737 pub const fn statistics(&self) -> &BatchStatistics {
738 &self.stats
739 }
740
741 pub fn reset(&mut self) {
743 self.skip_processor.reset();
745 self.stats = BatchStatistics {
746 total_lines: 0,
747 successful_lines: 0,
748 failed_lines: 0,
749 processing_time_ms: 0.0,
750 avg_memory_per_line: 0,
751 overall_schema_match_ratio: 0.0,
752 };
753 }
754
755 pub fn enable_gpu(&mut self) -> Result<bool> {
762 self.set_prescan_mode(PreScanMode::Gpu)
763 }
764
765 pub fn disable_gpu(&mut self) {
767 let _ = self.set_prescan_mode(PreScanMode::CpuOnly);
768 }
769
770 pub fn set_prescan_mode(&mut self, mode: PreScanMode) -> Result<bool> {
775 self.prescan_mode = mode;
776 match mode {
777 PreScanMode::CpuOnly => {
778 self.gpu_enabled = false;
779 Ok(false)
780 }
781 PreScanMode::Auto => Ok(self.gpu_enabled),
782 PreScanMode::Gpu => Ok(self.try_enable_gpu()),
783 }
784 }
785
786 #[must_use]
788 pub const fn prescan_mode(&self) -> PreScanMode {
789 self.prescan_mode
790 }
791
792 pub const fn set_gpu_min_bytes(&mut self, min_bytes: usize) {
794 self.gpu_min_bytes = min_bytes;
795 }
796
797 #[allow(clippy::needless_pass_by_ref_mut)] fn line_boundaries(&mut self, data: &[u8]) -> Vec<usize> {
799 if let Some(boundaries) = self.gpu_line_boundaries(data) {
800 return boundaries;
801 }
802 self.line_separator.find_line_boundaries(data)
803 }
804
805 #[allow(clippy::needless_pass_by_ref_mut)] fn structural_positions(&mut self, data: &[u8]) -> Vec<usize> {
807 if let Some(positions) = self.gpu_structural_positions(data) {
808 return positions;
809 }
810 self.structural_detector.find_structural_characters(data)
811 }
812
813 #[allow(clippy::unnecessary_wraps)] fn try_enable_gpu(&self) -> bool {
815 false
816 }
817
818 #[allow(dead_code)] const fn gpu_allowed(&self, data: &[u8]) -> bool {
820 data.len() >= 64
821 }
822
823 fn gpu_line_boundaries(&self, _data: &[u8]) -> Option<Vec<usize>> {
824 None
825 }
826
827 fn gpu_structural_positions(&self, _data: &[u8]) -> Option<Vec<usize>> {
828 None
829 }
830
831 fn gpu_prefilter_lines(
832 &self,
833 _data: &[u8],
834 _line_boundaries: &[usize],
835 _required_fields: &[String],
836 ) -> Option<Vec<bool>> {
837 None
838 }
839}