1use super::{NdjsonSource, RowError};
2use crate::data::value::Val;
3use crate::plan::physical::PlanningContext;
4use crate::util::is_truthy;
5use crate::{Jetro, JetroEngine, JetroEngineError, VM};
6use memchr::memchr;
7use serde_json::Value;
8use std::fs::File;
9use std::io::{BufRead, BufWriter, Write};
10use std::path::Path;
11use std::sync::MutexGuard;
12
13#[cfg(feature = "simd-json")]
14use super::ndjson_byte::{
15 eval_ndjson_byte_predicate_row, tape_plan_can_write_byte_row, write_ndjson_byte_plan_row,
16 write_ndjson_byte_tape_plan_row, BytePlanWrite,
17};
18#[cfg(test)]
19#[cfg(feature = "simd-json")]
20pub(super) use super::ndjson_direct::direct_byte_plan;
21#[cfg(feature = "simd-json")]
22pub(super) use super::ndjson_direct::{
23 direct_tape_plan, direct_tape_predicate, direct_writer_plans, NdjsonDirectBytePlan,
24 NdjsonDirectElement, NdjsonDirectItemPredicate, NdjsonDirectPredicate,
25 NdjsonDirectProjectionValue, NdjsonDirectStreamMap, NdjsonDirectStreamPlan,
26 NdjsonDirectStreamSink, NdjsonDirectTapePlan,
27};
28
29const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
30const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
31const DEFAULT_READER_BUFFER_CAPACITY: usize = 1024 * 1024;
32pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
36pub struct NdjsonOptions {
37 pub max_line_len: usize,
38 pub initial_buffer_capacity: usize,
39 pub reader_buffer_capacity: usize,
40 pub reverse_chunk_size: usize,
41}
42
43impl Default for NdjsonOptions {
44 fn default() -> Self {
45 Self {
46 max_line_len: DEFAULT_MAX_LINE_LEN,
47 initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
48 reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
49 reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
50 }
51 }
52}
53
54impl NdjsonOptions {
55 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
56 self.max_line_len = max_line_len;
57 self
58 }
59
60 pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
61 self.initial_buffer_capacity = capacity;
62 self
63 }
64
65 pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
66 self.reader_buffer_capacity = capacity;
67 self
68 }
69
70 pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
71 self.reverse_chunk_size = capacity;
72 self
73 }
74}
75
76pub struct NdjsonPerRowDriver<R> {
78 reader: R,
79 line_no: u64,
80 max_line_len: usize,
81}
82
83impl<R: BufRead> NdjsonPerRowDriver<R> {
84 pub fn new(reader: R) -> Self {
85 Self {
86 reader,
87 line_no: 0,
88 max_line_len: DEFAULT_MAX_LINE_LEN,
89 }
90 }
91
92 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
93 self.max_line_len = max_line_len;
94 self
95 }
96
97 pub fn line_no(&self) -> u64 {
98 self.line_no
99 }
100
101 pub fn read_next_nonempty<'a>(
104 &mut self,
105 buf: &'a mut Vec<u8>,
106 ) -> Result<Option<(u64, &'a [u8])>, RowError> {
107 loop {
108 buf.clear();
109 let read = self.read_physical_line(buf)?;
110 if read == 0 {
111 return Ok(None);
112 }
113 self.line_no += 1;
114
115 strip_initial_bom(self.line_no, buf);
116 trim_line_ending(buf);
117
118 let (start, end) = non_ws_range(buf);
119 if start == end {
120 continue;
121 }
122
123 let len = end - start;
124 if len > self.max_line_len {
125 return Err(RowError::LineTooLarge {
126 line_no: self.line_no,
127 len,
128 max: self.max_line_len,
129 });
130 }
131
132 return Ok(Some((self.line_no, &buf[start..end])));
133 }
134 }
135
136 pub fn read_next_owned(
140 &mut self,
141 buf: &mut Vec<u8>,
142 ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
143 loop {
144 buf.clear();
145 let read = self.read_physical_line(buf)?;
146 if read == 0 {
147 return Ok(None);
148 }
149 self.line_no += 1;
150
151 strip_initial_bom(self.line_no, buf);
152 trim_line_ending(buf);
153
154 let (start, end) = non_ws_range(buf);
155 if start == end {
156 continue;
157 }
158
159 let len = end - start;
160 if len > self.max_line_len {
161 return Err(RowError::LineTooLarge {
162 line_no: self.line_no,
163 len,
164 max: self.max_line_len,
165 });
166 }
167
168 let capacity = buf.capacity();
169 return Ok(Some((
170 self.line_no,
171 std::mem::replace(buf, Vec::with_capacity(capacity)),
172 )));
173 }
174 }
175
176 fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
177 loop {
178 let available = self.reader.fill_buf()?;
179 if available.is_empty() {
180 return Ok(buf.len());
181 }
182
183 if let Some(pos) = memchr(b'\n', available) {
184 buf.extend_from_slice(&available[..=pos]);
185 self.reader.consume(pos + 1);
186 self.check_physical_line_len(buf.len())?;
187 return Ok(buf.len());
188 }
189
190 let len = available.len();
191 buf.extend_from_slice(available);
192 self.reader.consume(len);
193 self.check_physical_line_len(buf.len())?;
194 }
195 }
196
197 fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
198 let hard_max = self.max_line_len.saturating_add(2);
199 if len > hard_max {
200 return Err(RowError::LineTooLarge {
201 line_no: self.line_no + 1,
202 len,
203 max: self.max_line_len,
204 });
205 }
206 Ok(())
207 }
208}
209
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211pub enum NdjsonControl {
212 Continue,
213 Stop,
214}
215
216pub fn for_each_ndjson<R, F>(
217 engine: &JetroEngine,
218 reader: R,
219 query: &str,
220 f: F,
221) -> Result<usize, JetroEngineError>
222where
223 R: BufRead,
224 F: FnMut(Value),
225{
226 for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
227}
228
229pub fn for_each_ndjson_with_options<R, F>(
230 engine: &JetroEngine,
231 reader: R,
232 query: &str,
233 options: NdjsonOptions,
234 mut f: F,
235) -> Result<usize, JetroEngineError>
236where
237 R: BufRead,
238 F: FnMut(Value),
239{
240 drive_ndjson(engine, reader, query, options, |value| {
241 f(value);
242 Ok(NdjsonControl::Continue)
243 })
244}
245
246pub fn for_each_ndjson_until<R, F>(
247 engine: &JetroEngine,
248 reader: R,
249 query: &str,
250 f: F,
251) -> Result<usize, JetroEngineError>
252where
253 R: BufRead,
254 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
255{
256 for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
257}
258
259pub fn for_each_ndjson_until_with_options<R, F>(
260 engine: &JetroEngine,
261 reader: R,
262 query: &str,
263 options: NdjsonOptions,
264 f: F,
265) -> Result<usize, JetroEngineError>
266where
267 R: BufRead,
268 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
269{
270 drive_ndjson(engine, reader, query, options, f)
271}
272
273pub fn for_each_ndjson_source<F>(
274 engine: &JetroEngine,
275 source: NdjsonSource,
276 query: &str,
277 f: F,
278) -> Result<usize, JetroEngineError>
279where
280 F: FnMut(Value),
281{
282 for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
283}
284
285pub fn for_each_ndjson_source_with_options<F>(
286 engine: &JetroEngine,
287 source: NdjsonSource,
288 query: &str,
289 options: NdjsonOptions,
290 f: F,
291) -> Result<usize, JetroEngineError>
292where
293 F: FnMut(Value),
294{
295 match source {
296 NdjsonSource::File(path) => {
297 let file = File::open(path)?;
298 for_each_ndjson_with_options(
299 engine,
300 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
301 query,
302 options,
303 f,
304 )
305 }
306 NdjsonSource::Reader(reader) => {
307 for_each_ndjson_with_options(engine, reader, query, options, f)
308 }
309 }
310}
311
312pub fn for_each_ndjson_source_until<F>(
313 engine: &JetroEngine,
314 source: NdjsonSource,
315 query: &str,
316 f: F,
317) -> Result<usize, JetroEngineError>
318where
319 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
320{
321 for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
322}
323
324pub fn for_each_ndjson_source_until_with_options<F>(
325 engine: &JetroEngine,
326 source: NdjsonSource,
327 query: &str,
328 options: NdjsonOptions,
329 f: F,
330) -> Result<usize, JetroEngineError>
331where
332 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
333{
334 match source {
335 NdjsonSource::File(path) => {
336 let file = File::open(path)?;
337 for_each_ndjson_until_with_options(
338 engine,
339 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
340 query,
341 options,
342 f,
343 )
344 }
345 NdjsonSource::Reader(reader) => {
346 for_each_ndjson_until_with_options(engine, reader, query, options, f)
347 }
348 }
349}
350
351pub fn collect_ndjson<R>(
352 engine: &JetroEngine,
353 reader: R,
354 query: &str,
355) -> Result<Vec<Value>, JetroEngineError>
356where
357 R: BufRead,
358{
359 collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
360}
361
362pub fn collect_ndjson_with_options<R>(
363 engine: &JetroEngine,
364 reader: R,
365 query: &str,
366 options: NdjsonOptions,
367) -> Result<Vec<Value>, JetroEngineError>
368where
369 R: BufRead,
370{
371 let mut values = Vec::new();
372 for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
373 Ok(values)
374}
375
376pub fn collect_ndjson_file<P>(
377 engine: &JetroEngine,
378 path: P,
379 query: &str,
380) -> Result<Vec<Value>, JetroEngineError>
381where
382 P: AsRef<Path>,
383{
384 let file = File::open(path)?;
385 let options = NdjsonOptions::default();
386 collect_ndjson_with_options(
387 engine,
388 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
389 query,
390 options,
391 )
392}
393
394pub fn collect_ndjson_file_with_options<P>(
395 engine: &JetroEngine,
396 path: P,
397 query: &str,
398 options: NdjsonOptions,
399) -> Result<Vec<Value>, JetroEngineError>
400where
401 P: AsRef<Path>,
402{
403 let file = File::open(path)?;
404 collect_ndjson_with_options(
405 engine,
406 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
407 query,
408 options,
409 )
410}
411
412pub fn collect_ndjson_source(
413 engine: &JetroEngine,
414 source: NdjsonSource,
415 query: &str,
416) -> Result<Vec<Value>, JetroEngineError> {
417 collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
418}
419
420pub fn collect_ndjson_source_with_options(
421 engine: &JetroEngine,
422 source: NdjsonSource,
423 query: &str,
424 options: NdjsonOptions,
425) -> Result<Vec<Value>, JetroEngineError> {
426 match source {
427 NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
428 NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
429 }
430}
431
432pub fn collect_ndjson_matches<R>(
433 engine: &JetroEngine,
434 reader: R,
435 predicate: &str,
436 limit: usize,
437) -> Result<Vec<Value>, JetroEngineError>
438where
439 R: BufRead,
440{
441 collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
442}
443
444pub fn collect_ndjson_matches_with_options<R>(
445 engine: &JetroEngine,
446 reader: R,
447 predicate: &str,
448 limit: usize,
449 options: NdjsonOptions,
450) -> Result<Vec<Value>, JetroEngineError>
451where
452 R: BufRead,
453{
454 let mut values = Vec::with_capacity(limit);
455 drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
456 values.push(Value::from(value));
457 Ok(NdjsonControl::Continue)
458 })?;
459 Ok(values)
460}
461
462pub fn collect_ndjson_matches_file<P>(
463 engine: &JetroEngine,
464 path: P,
465 predicate: &str,
466 limit: usize,
467) -> Result<Vec<Value>, JetroEngineError>
468where
469 P: AsRef<Path>,
470{
471 let file = File::open(path)?;
472 let options = NdjsonOptions::default();
473 collect_ndjson_matches_with_options(
474 engine,
475 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
476 predicate,
477 limit,
478 options,
479 )
480}
481
482pub fn collect_ndjson_matches_file_with_options<P>(
483 engine: &JetroEngine,
484 path: P,
485 predicate: &str,
486 limit: usize,
487 options: NdjsonOptions,
488) -> Result<Vec<Value>, JetroEngineError>
489where
490 P: AsRef<Path>,
491{
492 let file = File::open(path)?;
493 collect_ndjson_matches_with_options(
494 engine,
495 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
496 predicate,
497 limit,
498 options,
499 )
500}
501
502pub fn collect_ndjson_matches_source(
503 engine: &JetroEngine,
504 source: NdjsonSource,
505 predicate: &str,
506 limit: usize,
507) -> Result<Vec<Value>, JetroEngineError> {
508 collect_ndjson_matches_source_with_options(
509 engine,
510 source,
511 predicate,
512 limit,
513 NdjsonOptions::default(),
514 )
515}
516
517pub fn collect_ndjson_matches_source_with_options(
518 engine: &JetroEngine,
519 source: NdjsonSource,
520 predicate: &str,
521 limit: usize,
522 options: NdjsonOptions,
523) -> Result<Vec<Value>, JetroEngineError> {
524 match source {
525 NdjsonSource::File(path) => {
526 collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
527 }
528 NdjsonSource::Reader(reader) => {
529 collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
530 }
531 }
532}
533
534pub fn run_ndjson<R, W>(
535 engine: &JetroEngine,
536 reader: R,
537 query: &str,
538 writer: W,
539) -> Result<usize, JetroEngineError>
540where
541 R: BufRead,
542 W: Write,
543{
544 run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
545}
546
547pub fn run_ndjson_file<P, W>(
548 engine: &JetroEngine,
549 path: P,
550 query: &str,
551 writer: W,
552) -> Result<usize, JetroEngineError>
553where
554 P: AsRef<Path>,
555 W: Write,
556{
557 let file = File::open(path)?;
558 let options = NdjsonOptions::default();
559 run_ndjson_with_options(
560 engine,
561 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
562 query,
563 writer,
564 options,
565 )
566}
567
568pub fn run_ndjson_file_with_options<P, W>(
569 engine: &JetroEngine,
570 path: P,
571 query: &str,
572 writer: W,
573 options: NdjsonOptions,
574) -> Result<usize, JetroEngineError>
575where
576 P: AsRef<Path>,
577 W: Write,
578{
579 let file = File::open(path)?;
580 run_ndjson_with_options(
581 engine,
582 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
583 query,
584 writer,
585 options,
586 )
587}
588
589pub fn run_ndjson_with_options<R, W>(
590 engine: &JetroEngine,
591 reader: R,
592 query: &str,
593 writer: W,
594 options: NdjsonOptions,
595) -> Result<usize, JetroEngineError>
596where
597 R: BufRead,
598 W: Write,
599{
600 drive_ndjson_writer(engine, reader, query, None, options, writer)
601}
602
603pub fn run_ndjson_limit<R, W>(
604 engine: &JetroEngine,
605 reader: R,
606 query: &str,
607 limit: usize,
608 writer: W,
609) -> Result<usize, JetroEngineError>
610where
611 R: BufRead,
612 W: Write,
613{
614 run_ndjson_limit_with_options(
615 engine,
616 reader,
617 query,
618 limit,
619 writer,
620 NdjsonOptions::default(),
621 )
622}
623
624pub fn run_ndjson_limit_with_options<R, W>(
625 engine: &JetroEngine,
626 reader: R,
627 query: &str,
628 limit: usize,
629 writer: W,
630 options: NdjsonOptions,
631) -> Result<usize, JetroEngineError>
632where
633 R: BufRead,
634 W: Write,
635{
636 if limit == 0 {
637 return Ok(0);
638 }
639
640 drive_ndjson_writer(engine, reader, query, Some(limit), options, writer)
641}
642
643pub fn run_ndjson_file_limit<P, W>(
644 engine: &JetroEngine,
645 path: P,
646 query: &str,
647 limit: usize,
648 writer: W,
649) -> Result<usize, JetroEngineError>
650where
651 P: AsRef<Path>,
652 W: Write,
653{
654 let file = File::open(path)?;
655 let options = NdjsonOptions::default();
656 run_ndjson_limit_with_options(
657 engine,
658 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
659 query,
660 limit,
661 writer,
662 options,
663 )
664}
665
666pub fn run_ndjson_file_limit_with_options<P, W>(
667 engine: &JetroEngine,
668 path: P,
669 query: &str,
670 limit: usize,
671 writer: W,
672 options: NdjsonOptions,
673) -> Result<usize, JetroEngineError>
674where
675 P: AsRef<Path>,
676 W: Write,
677{
678 let file = File::open(path)?;
679 run_ndjson_limit_with_options(
680 engine,
681 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
682 query,
683 limit,
684 writer,
685 options,
686 )
687}
688
689pub fn run_ndjson_source<W>(
690 engine: &JetroEngine,
691 source: NdjsonSource,
692 query: &str,
693 writer: W,
694) -> Result<usize, JetroEngineError>
695where
696 W: Write,
697{
698 run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
699}
700
701pub fn run_ndjson_source_with_options<W>(
702 engine: &JetroEngine,
703 source: NdjsonSource,
704 query: &str,
705 writer: W,
706 options: NdjsonOptions,
707) -> Result<usize, JetroEngineError>
708where
709 W: Write,
710{
711 match source {
712 NdjsonSource::File(path) => {
713 run_ndjson_file_with_options(engine, path, query, writer, options)
714 }
715 NdjsonSource::Reader(reader) => {
716 run_ndjson_with_options(engine, reader, query, writer, options)
717 }
718 }
719}
720
721pub fn run_ndjson_source_limit<W>(
722 engine: &JetroEngine,
723 source: NdjsonSource,
724 query: &str,
725 limit: usize,
726 writer: W,
727) -> Result<usize, JetroEngineError>
728where
729 W: Write,
730{
731 run_ndjson_source_limit_with_options(
732 engine,
733 source,
734 query,
735 limit,
736 writer,
737 NdjsonOptions::default(),
738 )
739}
740
741pub fn run_ndjson_source_limit_with_options<W>(
742 engine: &JetroEngine,
743 source: NdjsonSource,
744 query: &str,
745 limit: usize,
746 writer: W,
747 options: NdjsonOptions,
748) -> Result<usize, JetroEngineError>
749where
750 W: Write,
751{
752 match source {
753 NdjsonSource::File(path) => {
754 run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
755 }
756 NdjsonSource::Reader(reader) => {
757 run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
758 }
759 }
760}
761
762pub fn run_ndjson_matches<R, W>(
763 engine: &JetroEngine,
764 reader: R,
765 predicate: &str,
766 limit: usize,
767 writer: W,
768) -> Result<usize, JetroEngineError>
769where
770 R: BufRead,
771 W: Write,
772{
773 run_ndjson_matches_with_options(
774 engine,
775 reader,
776 predicate,
777 limit,
778 writer,
779 NdjsonOptions::default(),
780 )
781}
782
783pub fn run_ndjson_matches_with_options<R, W>(
784 engine: &JetroEngine,
785 reader: R,
786 predicate: &str,
787 limit: usize,
788 writer: W,
789 options: NdjsonOptions,
790) -> Result<usize, JetroEngineError>
791where
792 R: BufRead,
793 W: Write,
794{
795 drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
796}
797
798pub fn run_ndjson_matches_file<P, W>(
799 engine: &JetroEngine,
800 path: P,
801 predicate: &str,
802 limit: usize,
803 writer: W,
804) -> Result<usize, JetroEngineError>
805where
806 P: AsRef<Path>,
807 W: Write,
808{
809 let file = File::open(path)?;
810 let options = NdjsonOptions::default();
811 run_ndjson_matches_with_options(
812 engine,
813 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
814 predicate,
815 limit,
816 writer,
817 options,
818 )
819}
820
821pub fn run_ndjson_matches_file_with_options<P, W>(
822 engine: &JetroEngine,
823 path: P,
824 predicate: &str,
825 limit: usize,
826 writer: W,
827 options: NdjsonOptions,
828) -> Result<usize, JetroEngineError>
829where
830 P: AsRef<Path>,
831 W: Write,
832{
833 let file = File::open(path)?;
834 run_ndjson_matches_with_options(
835 engine,
836 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
837 predicate,
838 limit,
839 writer,
840 options,
841 )
842}
843
844pub fn run_ndjson_matches_source<W>(
845 engine: &JetroEngine,
846 source: NdjsonSource,
847 predicate: &str,
848 limit: usize,
849 writer: W,
850) -> Result<usize, JetroEngineError>
851where
852 W: Write,
853{
854 run_ndjson_matches_source_with_options(
855 engine,
856 source,
857 predicate,
858 limit,
859 writer,
860 NdjsonOptions::default(),
861 )
862}
863
864pub fn run_ndjson_matches_source_with_options<W>(
865 engine: &JetroEngine,
866 source: NdjsonSource,
867 predicate: &str,
868 limit: usize,
869 writer: W,
870 options: NdjsonOptions,
871) -> Result<usize, JetroEngineError>
872where
873 W: Write,
874{
875 match source {
876 NdjsonSource::File(path) => {
877 run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
878 }
879 NdjsonSource::Reader(reader) => {
880 run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
881 }
882 }
883}
884
885fn drive_ndjson<R, F>(
886 engine: &JetroEngine,
887 reader: R,
888 query: &str,
889 options: NdjsonOptions,
890 mut emit: F,
891) -> Result<usize, JetroEngineError>
892where
893 R: BufRead,
894 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
895{
896 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
897 let plan = engine.cached_plan(query, PlanningContext::bytes());
898 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
899 let mut count = 0;
900
901 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
902 let document = parse_row(engine, line_no, row)?;
903 let out = collect_row_val(engine, &document, &plan, line_no)?;
904 count += 1;
905 if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
906 break;
907 }
908 }
909
910 Ok(count)
911}
912
913fn drive_ndjson_writer<R, W>(
914 engine: &JetroEngine,
915 reader: R,
916 query: &str,
917 limit: Option<usize>,
918 options: NdjsonOptions,
919 writer: W,
920) -> Result<usize, JetroEngineError>
921where
922 R: BufRead,
923 W: Write,
924{
925 #[cfg(feature = "simd-json")]
926 if let Some((byte_plan, tape_plan)) = direct_writer_plans(engine, query) {
927 if let Some(byte_plan) = byte_plan {
928 return drive_ndjson_byte_writer(
929 engine, reader, &byte_plan, &tape_plan, limit, options, writer,
930 );
931 }
932 if tape_plan_can_write_byte_row(&tape_plan) {
933 return drive_ndjson_tape_byte_writer(
934 engine, reader, &tape_plan, limit, options, writer,
935 );
936 }
937 return drive_ndjson_tape_writer(engine, reader, &tape_plan, limit, options, writer);
938 }
939
940 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
941 let mut executor = NdjsonRowExecutor::new(engine, query);
942 let mut writer = ndjson_writer_with_options(writer, options);
943 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
944 let mut count = 0usize;
945
946 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
947 count += 1;
948 executor.write_owned_row(line_no, row, &mut writer)?;
949 if limit.is_some_and(|limit| count >= limit) {
950 break;
951 }
952 }
953
954 writer.flush()?;
955 Ok(count)
956}
957
958#[cfg(feature = "simd-json")]
959fn drive_ndjson_byte_writer<R, W>(
960 engine: &JetroEngine,
961 reader: R,
962 byte_plan: &NdjsonDirectBytePlan,
963 tape_plan: &NdjsonDirectTapePlan,
964 limit: Option<usize>,
965 options: NdjsonOptions,
966 writer: W,
967) -> Result<usize, JetroEngineError>
968where
969 R: BufRead,
970 W: Write,
971{
972 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
973 let mut writer = ndjson_writer_with_options(writer, options);
974 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
975 let mut scratch =
976 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
977 let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
978 let mut count = 0usize;
979
980 visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
981 match write_ndjson_byte_plan_row(&mut writer, row, byte_plan)? {
982 BytePlanWrite::Done => {}
983 BytePlanWrite::Fallback => {
984 scratch.parse_slice(row).map_err(|message| {
985 row_parse_error(
986 line_no,
987 JetroEngineError::Eval(crate::EvalError(format!(
988 "Invalid JSON: {message}"
989 ))),
990 )
991 })?;
992 tape_runner.write_row(&scratch, &mut writer)?;
993 }
994 }
995 writer.write_all(b"\n")?;
996 count += 1;
997 Ok(!limit.is_some_and(|limit| count >= limit))
998 })?;
999
1000 writer.flush()?;
1001 Ok(count)
1002}
1003
1004#[cfg(feature = "simd-json")]
1005fn drive_ndjson_tape_byte_writer<R, W>(
1006 engine: &JetroEngine,
1007 reader: R,
1008 tape_plan: &NdjsonDirectTapePlan,
1009 limit: Option<usize>,
1010 options: NdjsonOptions,
1011 writer: W,
1012) -> Result<usize, JetroEngineError>
1013where
1014 R: BufRead,
1015 W: Write,
1016{
1017 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1018 let mut writer = ndjson_writer_with_options(writer, options);
1019 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1020 let mut scratch =
1021 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1022 let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
1023 let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1024 let mut count = 0usize;
1025
1026 visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1027 match write_ndjson_byte_tape_plan_row(&mut writer, row, tape_plan, &mut byte_scratch)? {
1028 BytePlanWrite::Done => {}
1029 BytePlanWrite::Fallback => {
1030 scratch.parse_slice(row).map_err(|message| {
1031 row_parse_error(
1032 line_no,
1033 JetroEngineError::Eval(crate::EvalError(format!(
1034 "Invalid JSON: {message}"
1035 ))),
1036 )
1037 })?;
1038 tape_runner.write_row(&scratch, &mut writer)?;
1039 }
1040 }
1041 writer.write_all(b"\n")?;
1042 count += 1;
1043 Ok(!limit.is_some_and(|limit| count >= limit))
1044 })?;
1045
1046 writer.flush()?;
1047 Ok(count)
1048}
1049
1050#[cfg(feature = "simd-json")]
1051fn visit_ndjson_borrowed_rows<R, F>(
1052 driver: &mut NdjsonPerRowDriver<R>,
1053 spill: &mut Vec<u8>,
1054 mut visit: F,
1055) -> Result<(), JetroEngineError>
1056where
1057 R: BufRead,
1058 F: FnMut(u64, &[u8]) -> Result<bool, JetroEngineError>,
1059{
1060 loop {
1061 spill.clear();
1062 let available = driver.reader.fill_buf()?;
1063 if available.is_empty() {
1064 return Ok(());
1065 }
1066 if let Some(pos) = memchr(b'\n', available) {
1067 driver.line_no += 1;
1068 let line_no = driver.line_no;
1069 let mut row = &available[..pos];
1070 if row.last() == Some(&b'\r') {
1071 row = &row[..row.len() - 1];
1072 }
1073 if line_no == 1 && row.starts_with(&[0xef, 0xbb, 0xbf]) {
1074 row = &row[3..];
1075 }
1076 let (start, end) = non_ws_range(row);
1077 let keep_going = if start == end {
1078 true
1079 } else {
1080 let row = &row[start..end];
1081 if row.len() > driver.max_line_len {
1082 return Err(RowError::LineTooLarge {
1083 line_no,
1084 len: row.len(),
1085 max: driver.max_line_len,
1086 }
1087 .into());
1088 }
1089 visit(line_no, row)?
1090 };
1091 driver.reader.consume(pos + 1);
1092 if !keep_going {
1093 return Ok(());
1094 }
1095 } else {
1096 let read = driver.read_physical_line(spill)?;
1097 if read == 0 {
1098 return Ok(());
1099 }
1100 driver.line_no += 1;
1101 strip_initial_bom(driver.line_no, spill);
1102 trim_line_ending(spill);
1103 let (start, end) = non_ws_range(spill);
1104 if start == end {
1105 continue;
1106 }
1107 let len = end - start;
1108 if len > driver.max_line_len {
1109 return Err(RowError::LineTooLarge {
1110 line_no: driver.line_no,
1111 len,
1112 max: driver.max_line_len,
1113 }
1114 .into());
1115 }
1116 if !visit(driver.line_no, &spill[start..end])? {
1117 return Ok(());
1118 }
1119 }
1120 }
1121}
1122
1123#[cfg(feature = "simd-json")]
1124fn drive_ndjson_tape_writer<R, W>(
1125 engine: &JetroEngine,
1126 reader: R,
1127 plan: &NdjsonDirectTapePlan,
1128 limit: Option<usize>,
1129 options: NdjsonOptions,
1130 writer: W,
1131) -> Result<usize, JetroEngineError>
1132where
1133 R: BufRead,
1134 W: Write,
1135{
1136 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1137 let mut writer = ndjson_writer_with_options(writer, options);
1138 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1139 let mut scratch =
1140 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1141 let mut count = 0usize;
1142 let mut runner = NdjsonTapeWriterRunner::new(engine, plan);
1143
1144 while let Some((line_no, row)) = driver.read_next_nonempty(&mut line)? {
1145 scratch.parse_slice(row).map_err(|message| {
1146 row_parse_error(
1147 line_no,
1148 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
1149 )
1150 })?;
1151 runner.write_row(&scratch, &mut writer)?;
1152 writer.write_all(b"\n")?;
1153 count += 1;
1154 if limit.is_some_and(|limit| count >= limit) {
1155 break;
1156 }
1157 }
1158
1159 writer.flush()?;
1160 Ok(count)
1161}
1162
1163#[cfg(feature = "simd-json")]
1164pub(super) struct NdjsonTapeWriterRunner<'a, 'p> {
1165 plan: &'p NdjsonDirectTapePlan,
1166 vm: Option<MutexGuard<'a, VM>>,
1167 env: Option<crate::data::context::Env>,
1168 root_path: NdjsonPathCache,
1169 source_path: NdjsonPathCache,
1170 suffix_path: NdjsonPathCache,
1171 predicate_path: NdjsonPathCache,
1172 object_paths: Vec<NdjsonPathCache>,
1173}
1174
1175#[cfg(feature = "simd-json")]
1176impl<'a, 'p> NdjsonTapeWriterRunner<'a, 'p> {
1177 pub(super) fn new(engine: &'a JetroEngine, plan: &'p NdjsonDirectTapePlan) -> Self {
1178 let needs_vm = plan.needs_vm();
1179 Self {
1180 plan,
1181 vm: needs_vm.then(|| engine.lock_vm()),
1182 env: needs_vm.then(|| crate::data::context::Env::new(Val::Null)),
1183 root_path: NdjsonPathCache::default(),
1184 source_path: NdjsonPathCache::default(),
1185 suffix_path: NdjsonPathCache::default(),
1186 predicate_path: NdjsonPathCache::default(),
1187 object_paths: Vec::new(),
1188 }
1189 }
1190
1191 pub(super) fn write_row<W: Write>(
1192 &mut self,
1193 scratch: &crate::data::tape::TapeScratch,
1194 writer: &mut W,
1195 ) -> Result<(), JetroEngineError> {
1196 match self.plan {
1197 NdjsonDirectTapePlan::RootPath(steps) => {
1198 if let Some(idx) = self.root_path.index(scratch, 0, steps) {
1199 write_json_tape_at(writer, scratch, idx)?;
1200 } else {
1201 writer.write_all(b"null")?;
1202 }
1203 }
1204 NdjsonDirectTapePlan::ViewScalarCall {
1205 steps,
1206 call,
1207 optional,
1208 } => {
1209 let idx = self.root_path.index(scratch, 0, steps);
1210 let value = idx
1211 .map(|idx| json_tape_scalar(scratch, idx))
1212 .unwrap_or(crate::util::JsonView::Null);
1213 if *optional && matches!(value, crate::util::JsonView::Null) {
1214 writer.write_all(b"null")?;
1215 } else if let Some(value) = call.try_apply_json_view(value) {
1216 write_val_json(writer, &value)?;
1217 } else if let Some(idx) = idx {
1218 write_json_tape_at(writer, scratch, idx)?;
1219 } else {
1220 writer.write_all(b"null")?;
1221 }
1222 }
1223 NdjsonDirectTapePlan::ArrayElementViewScalarCall {
1224 source_steps,
1225 element,
1226 suffix_steps,
1227 call,
1228 } => {
1229 let idx = self
1230 .source_path
1231 .index(scratch, 0, source_steps)
1232 .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1233 .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1234 if let Some(value) = idx
1235 .map(|idx| json_tape_scalar(scratch, idx))
1236 .and_then(|value| call.try_apply_json_view(value))
1237 {
1238 write_val_json(writer, &value)?;
1239 } else if let Some(idx) = idx {
1240 write_json_tape_at(writer, scratch, idx)?;
1241 } else {
1242 writer.write_all(b"null")?;
1243 }
1244 }
1245 NdjsonDirectTapePlan::ObjectItems { steps, method } => {
1246 let idx = self.root_path.index(scratch, 0, steps);
1247 write_json_tape_object_items(writer, scratch, idx, *method)?;
1248 }
1249 NdjsonDirectTapePlan::ArrayElementPath {
1250 source_steps,
1251 element,
1252 suffix_steps,
1253 } => {
1254 let idx = self
1255 .source_path
1256 .index(scratch, 0, source_steps)
1257 .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1258 .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1259 if let Some(idx) = idx {
1260 write_json_tape_at(writer, scratch, idx)?;
1261 } else {
1262 writer.write_all(b"null")?;
1263 }
1264 }
1265 NdjsonDirectTapePlan::Stream(plan) => {
1266 write_json_tape_stream(
1267 writer,
1268 scratch,
1269 plan,
1270 &mut self.source_path,
1271 &mut self.suffix_path,
1272 &mut self.predicate_path,
1273 &mut self.object_paths,
1274 )?;
1275 }
1276 NdjsonDirectTapePlan::Object(fields) => {
1277 write_json_tape_object_projection(writer, scratch, fields, &mut self.object_paths)?;
1278 }
1279 NdjsonDirectTapePlan::Array(items) => {
1280 write_json_tape_array_projection(writer, scratch, items, &mut self.object_paths)?;
1281 }
1282 NdjsonDirectTapePlan::ViewPipeline { source_steps, body } => {
1283 let (Some(vm), Some(env)) = (self.vm.as_deref_mut(), self.env.as_ref()) else {
1284 return Err(JetroEngineError::Eval(crate::EvalError(
1285 "NDJSON view pipeline requires VM state".to_string(),
1286 )));
1287 };
1288 let source = json_tape_path_index(scratch, source_steps)
1289 .map(|idx| crate::data::view::TapeScratchView::Node { tape: scratch, idx })
1290 .unwrap_or(crate::data::view::TapeScratchView::Missing);
1291 let Some(result) =
1292 crate::exec::view::run_with_env_and_vm(source, body, None, &env, vm)
1293 else {
1294 writer.write_all(b"null")?;
1295 return Ok(());
1296 };
1297 write_val_json(writer, &result.map_err(JetroEngineError::Eval)?)?;
1298 }
1299 }
1300 Ok(())
1301 }
1302}
1303
1304#[cfg(feature = "simd-json")]
1305#[derive(Default)]
1306pub(super) struct NdjsonPathCache {
1307 fields: Vec<Option<NdjsonFieldCache>>,
1311}
1312
1313#[cfg(feature = "simd-json")]
1314#[derive(Clone, Copy)]
1315struct NdjsonFieldCache {
1316 key_delta: usize,
1317 value_delta: usize,
1318}
1319
1320#[cfg(feature = "simd-json")]
1321struct NdjsonPathCaches<'a> {
1322 source: &'a mut NdjsonPathCache,
1323 suffix: &'a mut NdjsonPathCache,
1324 predicate: &'a mut NdjsonPathCache,
1325}
1326
1327#[cfg(feature = "simd-json")]
1328impl NdjsonPathCache {
1329 fn index<T: JsonTape>(
1330 &mut self,
1331 tape: &T,
1332 start: usize,
1333 steps: &[crate::ir::physical::PhysicalPathStep],
1334 ) -> Option<usize> {
1335 if let Some(idx) = self.index_cached(tape, start, steps) {
1336 return Some(idx);
1337 }
1338 self.index_uncached(tape, start, steps)
1339 }
1340
1341 fn index_cached<T: JsonTape>(
1342 &self,
1343 tape: &T,
1344 start: usize,
1345 steps: &[crate::ir::physical::PhysicalPathStep],
1346 ) -> Option<usize> {
1347 use crate::ir::physical::PhysicalPathStep;
1348
1349 let [PhysicalPathStep::Field(key), rest @ ..] = steps else {
1350 return None;
1351 };
1352 if rest
1353 .iter()
1354 .any(|step| matches!(step, PhysicalPathStep::Field(_)))
1355 {
1356 return None;
1357 }
1358 let Some(field) = self
1359 .fields
1360 .first()
1361 .copied()
1362 .flatten()
1363 .filter(|field| field.key_delta > 1)
1364 else {
1365 return None;
1366 };
1367 let idx = json_tape_object_cached_field(tape, start, field, key.as_ref())?;
1368 let mut cur = idx;
1369 for step in rest {
1370 cur = json_tape_step_index(tape, cur, step)?;
1371 }
1372 Some(cur)
1373 }
1374
1375 fn index_uncached<T: JsonTape>(
1376 &mut self,
1377 tape: &T,
1378 start: usize,
1379 steps: &[crate::ir::physical::PhysicalPathStep],
1380 ) -> Option<usize> {
1381 self.index_from_depth(tape, start, steps, 0)
1382 }
1383
1384 fn index_from_depth<T: JsonTape>(
1385 &mut self,
1386 tape: &T,
1387 start: usize,
1388 steps: &[crate::ir::physical::PhysicalPathStep],
1389 depth: usize,
1390 ) -> Option<usize> {
1391 use crate::ir::physical::PhysicalPathStep;
1392
1393 match steps {
1394 [] => Some(start),
1395 [PhysicalPathStep::Field(key), rest @ ..] => {
1396 if self.fields.len() <= depth {
1397 self.fields.resize(depth + 1, None);
1398 }
1399
1400 if let Some(field) = self.fields[depth].filter(|field| field.key_delta > 1) {
1401 if let Some(idx) =
1402 json_tape_object_cached_field(tape, start, field, key.as_ref())
1403 {
1404 return self.index_from_depth(tape, idx, rest, depth + 1);
1405 }
1406 }
1407
1408 let (idx, field) =
1409 json_tape_object_field_index_and_cache(tape, start, key.as_ref())?;
1410 self.fields[depth] = Some(field);
1411 self.index_from_depth(tape, idx, rest, depth + 1)
1412 }
1413 [step, rest @ ..] => {
1414 let idx = json_tape_step_index(tape, start, step)?;
1415 self.index_from_depth(tape, idx, rest, depth + 1)
1416 }
1417 }
1418 }
1419}
1420
1421#[cfg(feature = "simd-json")]
1422fn drive_ndjson_tape_matches_writer<R, W>(
1423 engine: &JetroEngine,
1424 reader: R,
1425 predicate: &NdjsonDirectPredicate,
1426 limit: usize,
1427 options: NdjsonOptions,
1428 writer: W,
1429) -> Result<usize, JetroEngineError>
1430where
1431 R: BufRead,
1432 W: Write,
1433{
1434 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1435 let mut writer = ndjson_writer_with_options(writer, options);
1436 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1437 let mut scratch =
1438 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1439 let mut emitted = 0usize;
1440 let needs_vm = predicate_needs_vm(predicate);
1441 let mut vm = needs_vm.then(|| engine.lock_vm());
1442 let env = needs_vm.then(|| crate::data::context::Env::new(Val::Null));
1443 let mut predicate_path = NdjsonPathCache::default();
1444
1445 while let Some((line_no, row)) = driver.read_next_owned(&mut line)? {
1446 if let Some(matched) = eval_ndjson_byte_predicate_row(&row, predicate)? {
1447 if !matched {
1448 continue;
1449 }
1450 writer.write_all(&row)?;
1451 writer.write_all(b"\n")?;
1452 emitted += 1;
1453 if emitted >= limit {
1454 break;
1455 }
1456 continue;
1457 }
1458
1459 scratch.parse_slice(&row).map_err(|message| {
1460 row_parse_error(
1461 line_no,
1462 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
1463 )
1464 })?;
1465 if !eval_tape_predicate(
1466 &scratch,
1467 predicate,
1468 env.as_ref(),
1469 &mut vm,
1470 &mut predicate_path,
1471 )
1472 .map_err(JetroEngineError::Eval)?
1473 {
1474 continue;
1475 }
1476 writer.write_all(&row)?;
1477 writer.write_all(b"\n")?;
1478 emitted += 1;
1479 if emitted >= limit {
1480 break;
1481 }
1482 }
1483
1484 writer.flush()?;
1485 Ok(emitted)
1486}
1487
1488fn drive_ndjson_matches<R, F>(
1489 engine: &JetroEngine,
1490 reader: R,
1491 predicate: &str,
1492 limit: usize,
1493 options: NdjsonOptions,
1494 mut emit: F,
1495) -> Result<usize, JetroEngineError>
1496where
1497 R: BufRead,
1498 F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
1499{
1500 if limit == 0 {
1501 return Ok(0);
1502 }
1503
1504 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1505 #[cfg(feature = "simd-json")]
1506 let direct_predicate = direct_tape_predicate(engine, predicate);
1507 let mut executor = NdjsonRowExecutor::new(engine, predicate);
1508 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1509 let mut emitted = 0usize;
1510
1511 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1512 #[cfg(feature = "simd-json")]
1513 if let Some(predicate) = direct_predicate.as_ref() {
1514 if let Some(false) = eval_ndjson_byte_predicate_row(&row, predicate)? {
1515 continue;
1516 }
1517 }
1518
1519 let document = executor.parse_owned_row(line_no, row)?;
1520 let matched = executor.eval_document(line_no, &document)?;
1521 if !is_truthy(&matched) {
1522 continue;
1523 }
1524
1525 let root = document
1526 .root_val_with(engine.keys())
1527 .map_err(|err| row_eval_error(line_no, err))?;
1528 emitted += 1;
1529 if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
1530 break;
1531 }
1532 }
1533
1534 Ok(emitted)
1535}
1536
1537fn drive_ndjson_matches_writer<R, W>(
1538 engine: &JetroEngine,
1539 reader: R,
1540 predicate: &str,
1541 limit: usize,
1542 options: NdjsonOptions,
1543 writer: W,
1544) -> Result<usize, JetroEngineError>
1545where
1546 R: BufRead,
1547 W: Write,
1548{
1549 if limit == 0 {
1550 return Ok(0);
1551 }
1552
1553 #[cfg(feature = "simd-json")]
1554 if let Some(predicate) = direct_tape_predicate(engine, predicate) {
1555 return drive_ndjson_tape_matches_writer(
1556 engine, reader, &predicate, limit, options, writer,
1557 );
1558 }
1559
1560 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1561 let mut executor = NdjsonRowExecutor::new(engine, predicate);
1562 let mut writer = ndjson_writer_with_options(writer, options);
1563 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1564 let mut emitted = 0usize;
1565
1566 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1567 let document = executor.parse_owned_row(line_no, row)?;
1568 let matched = executor.eval_document(line_no, &document)?;
1569 if !is_truthy(&matched) {
1570 continue;
1571 }
1572
1573 write_document_line(&mut writer, &document, line_no, executor.engine())?;
1574 emitted += 1;
1575 if emitted >= limit {
1576 break;
1577 }
1578 }
1579
1580 writer.flush()?;
1581 Ok(emitted)
1582}
1583
1584pub(super) struct NdjsonRowExecutor<'a> {
1585 engine: &'a JetroEngine,
1586 plan: crate::ir::physical::QueryPlan,
1587 vm: MutexGuard<'a, VM>,
1588}
1589
1590impl<'a> NdjsonRowExecutor<'a> {
1591 pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
1592 Self {
1593 engine,
1594 plan: engine.cached_plan(query, PlanningContext::bytes()),
1595 vm: engine.lock_vm(),
1596 }
1597 }
1598
1599 pub(super) fn eval_owned_row(
1600 &mut self,
1601 line_no: u64,
1602 row: Vec<u8>,
1603 ) -> Result<Val, JetroEngineError> {
1604 let document = self.parse_owned_row(line_no, row)?;
1605 self.eval_document(line_no, &document)
1606 }
1607
1608 pub(super) fn write_owned_row<W: Write>(
1609 &mut self,
1610 line_no: u64,
1611 row: Vec<u8>,
1612 writer: &mut W,
1613 ) -> Result<(), JetroEngineError> {
1614 let document = self.parse_owned_result_row(line_no, row)?;
1615 self.write_document_result(line_no, &document, writer)
1616 }
1617
1618 fn parse_owned_result_row(
1619 &self,
1620 line_no: u64,
1621 row: Vec<u8>,
1622 ) -> Result<Jetro, JetroEngineError> {
1623 #[cfg(feature = "simd-json")]
1624 {
1625 crate::data::tape::TapeData::parse(row)
1626 .map(Jetro::from_tape_data)
1627 .map_err(|message| {
1628 row_parse_error(
1629 line_no,
1630 JetroEngineError::Eval(crate::EvalError(format!(
1631 "Invalid JSON: {message}"
1632 ))),
1633 )
1634 })
1635 }
1636 #[cfg(not(feature = "simd-json"))]
1637 {
1638 self.parse_owned_row(line_no, row)
1639 }
1640 }
1641
1642 pub(super) fn parse_owned_row(
1643 &self,
1644 line_no: u64,
1645 row: Vec<u8>,
1646 ) -> Result<Jetro, JetroEngineError> {
1647 parse_row(self.engine, line_no, row)
1648 }
1649
1650 pub(super) fn eval_document(
1651 &mut self,
1652 line_no: u64,
1653 document: &Jetro,
1654 ) -> Result<Val, JetroEngineError> {
1655 crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
1656 .map_err(|err| row_eval_error(line_no, err))
1657 }
1658
1659 pub(super) fn write_document_result<W: Write>(
1660 &mut self,
1661 line_no: u64,
1662 document: &Jetro,
1663 writer: &mut W,
1664 ) -> Result<(), JetroEngineError> {
1665 if self.try_write_tape_result(line_no, document, writer)? {
1666 return Ok(());
1667 }
1668 let value = self.eval_document(line_no, document)?;
1669 write_val_line(writer, &value)
1670 }
1671
1672 fn try_write_tape_result<W: Write>(
1673 &self,
1674 line_no: u64,
1675 document: &Jetro,
1676 writer: &mut W,
1677 ) -> Result<bool, JetroEngineError> {
1678 #[cfg(feature = "simd-json")]
1679 {
1680 use crate::ir::physical::{PlanNode, QueryRoot};
1681
1682 let QueryRoot::Node(root) = self.plan.root() else {
1683 return Ok(false);
1684 };
1685 let PlanNode::RootPath(steps) = self.plan.node(*root) else {
1686 return Ok(false);
1687 };
1688 let Some(tape) = document
1689 .lazy_tape()
1690 .map_err(|err| row_eval_error(line_no, err))?
1691 else {
1692 return Ok(false);
1693 };
1694 if let Some(idx) = json_tape_path_index(tape.as_ref(), steps) {
1695 write_json_tape_at(writer, tape.as_ref(), idx)?;
1696 } else {
1697 writer.write_all(b"null")?;
1698 }
1699 writer.write_all(b"\n")?;
1700 Ok(true)
1701 }
1702 #[cfg(not(feature = "simd-json"))]
1703 {
1704 let _ = (line_no, document, writer);
1705 Ok(false)
1706 }
1707 }
1708
1709 pub(super) fn engine(&self) -> &'a JetroEngine {
1710 self.engine
1711 }
1712}
1713
1714#[cfg(feature = "simd-json")]
1715trait JsonTape {
1716 fn nodes(&self) -> &[crate::data::tape::TapeNode];
1717 fn str_at(&self, idx: usize) -> &str;
1718 fn span(&self, idx: usize) -> usize;
1719}
1720
1721#[cfg(feature = "simd-json")]
1722impl JsonTape for crate::data::tape::TapeData {
1723 #[inline]
1724 fn nodes(&self) -> &[crate::data::tape::TapeNode] {
1725 &self.nodes
1726 }
1727
1728 #[inline]
1729 fn str_at(&self, idx: usize) -> &str {
1730 self.str_at(idx)
1731 }
1732
1733 #[inline]
1734 fn span(&self, idx: usize) -> usize {
1735 self.span(idx)
1736 }
1737}
1738
1739#[cfg(feature = "simd-json")]
1740impl JsonTape for crate::data::tape::TapeScratch {
1741 #[inline]
1742 fn nodes(&self) -> &[crate::data::tape::TapeNode] {
1743 &self.nodes
1744 }
1745
1746 #[inline]
1747 fn str_at(&self, idx: usize) -> &str {
1748 self.str_at(idx)
1749 }
1750
1751 #[inline]
1752 fn span(&self, idx: usize) -> usize {
1753 self.span(idx)
1754 }
1755}
1756
1757#[cfg(feature = "simd-json")]
1758fn json_tape_path_index<T: JsonTape>(
1759 tape: &T,
1760 steps: &[crate::ir::physical::PhysicalPathStep],
1761) -> Option<usize> {
1762 json_tape_path_index_from(tape, 0, steps)
1763}
1764
1765#[cfg(feature = "simd-json")]
1766fn json_tape_path_index_from<T: JsonTape>(
1767 tape: &T,
1768 start: usize,
1769 steps: &[crate::ir::physical::PhysicalPathStep],
1770) -> Option<usize> {
1771 if tape.nodes().is_empty() {
1772 return None;
1773 }
1774
1775 return match steps {
1776 [] => Some(start),
1777 [step] => json_tape_step_index(tape, start, step),
1778 [first, second] => json_tape_step_index(tape, start, first)
1779 .and_then(|idx| json_tape_step_index(tape, idx, second)),
1780 _ => json_tape_path_index_slow(tape, start, steps),
1781 };
1782}
1783
1784#[cfg(feature = "simd-json")]
1785fn json_tape_path_index_slow<T: JsonTape>(
1786 tape: &T,
1787 start: usize,
1788 steps: &[crate::ir::physical::PhysicalPathStep],
1789) -> Option<usize> {
1790 let mut idx = start;
1791 for step in steps {
1792 idx = json_tape_step_index(tape, idx, step)?;
1793 }
1794 Some(idx)
1795}
1796
1797#[cfg(feature = "simd-json")]
1798fn json_tape_step_index<T: JsonTape>(
1799 tape: &T,
1800 start: usize,
1801 step: &crate::ir::physical::PhysicalPathStep,
1802) -> Option<usize> {
1803 use crate::data::tape::TapeNode;
1804 use crate::ir::physical::PhysicalPathStep;
1805
1806 match step {
1807 PhysicalPathStep::Field(key) => {
1808 let TapeNode::Object { len, .. } = tape.nodes()[start] else {
1809 return None;
1810 };
1811 let mut cur = start + 1;
1812 for _ in 0..len {
1813 if tape.str_at(cur) == key.as_ref() {
1814 return Some(cur + 1);
1815 }
1816 cur += 1;
1817 cur += tape.span(cur);
1818 }
1819 None
1820 }
1821 PhysicalPathStep::Index(wanted) => {
1822 let TapeNode::Array { len, .. } = tape.nodes()[start] else {
1823 return None;
1824 };
1825 let wanted = if *wanted < 0 {
1826 len.checked_sub(wanted.unsigned_abs() as usize)?
1827 } else {
1828 *wanted as usize
1829 };
1830 if wanted >= len {
1831 return None;
1832 }
1833 let mut cur = start + 1;
1834 for _ in 0..wanted {
1835 cur += tape.span(cur);
1836 }
1837 Some(cur)
1838 }
1839 }
1840}
1841
1842#[cfg(feature = "simd-json")]
1843fn json_tape_object_cached_field<T: JsonTape>(
1844 tape: &T,
1845 obj_idx: usize,
1846 cache: NdjsonFieldCache,
1847 key: &str,
1848) -> Option<usize> {
1849 let crate::data::tape::TapeNode::Object { .. } = tape.nodes().get(obj_idx).copied()? else {
1850 return None;
1851 };
1852 let key_idx = obj_idx.checked_add(cache.key_delta)?;
1853 let value_idx = obj_idx.checked_add(cache.value_delta)?;
1854 if value_idx >= tape.nodes().len() {
1855 return None;
1856 }
1857 if !matches!(
1858 tape.nodes().get(key_idx),
1859 Some(crate::data::tape::TapeNode::String(_))
1860 ) {
1861 return None;
1862 }
1863 (tape.str_at(key_idx) == key).then_some(value_idx)
1864}
1865
1866#[cfg(feature = "simd-json")]
1867fn json_tape_object_field_index_and_cache<T: JsonTape>(
1868 tape: &T,
1869 obj_idx: usize,
1870 key: &str,
1871) -> Option<(usize, NdjsonFieldCache)> {
1872 let crate::data::tape::TapeNode::Object { len, .. } = tape.nodes()[obj_idx] else {
1873 return None;
1874 };
1875 let mut cur = obj_idx + 1;
1876 for _ in 0..len {
1877 if tape.str_at(cur) == key {
1878 return Some((
1879 cur + 1,
1880 NdjsonFieldCache {
1881 key_delta: cur - obj_idx,
1882 value_delta: cur + 1 - obj_idx,
1883 },
1884 ));
1885 }
1886 cur += 1;
1887 cur += tape.span(cur);
1888 }
1889 None
1890}
1891
1892#[cfg(feature = "simd-json")]
1893fn json_tape_array_element<T: JsonTape>(
1894 tape: &T,
1895 idx: usize,
1896 element: NdjsonDirectElement,
1897) -> Option<usize> {
1898 let crate::data::tape::TapeNode::Array { len, .. } = tape.nodes().get(idx).copied()? else {
1899 return None;
1900 };
1901 let wanted = match element {
1902 NdjsonDirectElement::First => 0,
1903 NdjsonDirectElement::Last => len.checked_sub(1)?,
1904 NdjsonDirectElement::Nth(n) => n,
1905 };
1906 if wanted >= len {
1907 return None;
1908 }
1909 let mut cur = idx + 1;
1910 for _ in 0..wanted {
1911 cur += tape.span(cur);
1912 }
1913 Some(cur)
1914}
1915
1916#[cfg(feature = "simd-json")]
1917pub(super) fn eval_tape_predicate(
1918 tape: &crate::data::tape::TapeScratch,
1919 predicate: &NdjsonDirectPredicate,
1920 env: Option<&crate::data::context::Env>,
1921 vm: &mut Option<std::sync::MutexGuard<'_, crate::vm::exec::VM>>,
1922 cache: &mut NdjsonPathCache,
1923) -> Result<bool, crate::EvalError> {
1924 use crate::parse::ast::BinOp;
1925
1926 Ok(match predicate {
1927 NdjsonDirectPredicate::Path(steps) => cache
1928 .index(tape, 0, steps)
1929 .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
1930 .unwrap_or(false),
1931 NdjsonDirectPredicate::Literal(value) => crate::util::is_truthy(value),
1932 NdjsonDirectPredicate::Not(inner) => !eval_tape_predicate(tape, inner, env, vm, cache)?,
1933 NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
1934 eval_tape_predicate(tape, lhs, env, vm, cache)?
1935 && eval_tape_predicate(tape, rhs, env, vm, cache)?
1936 }
1937 NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
1938 eval_tape_predicate(tape, lhs, env, vm, cache)?
1939 || eval_tape_predicate(tape, rhs, env, vm, cache)?
1940 }
1941 NdjsonDirectPredicate::Binary { lhs, op, rhs } => {
1942 let Some(lhs) = eval_tape_scalar(tape, lhs, cache) else {
1943 return Ok(false);
1944 };
1945 let Some(rhs) = eval_tape_scalar(tape, rhs, cache) else {
1946 return Ok(false);
1947 };
1948 crate::util::json_cmp_binop(lhs, *op, rhs)
1949 }
1950 NdjsonDirectPredicate::ViewScalarCall { steps, call } => cache
1951 .index(tape, 0, steps)
1952 .map(|idx| json_tape_scalar(tape, idx))
1953 .and_then(|value| call.try_apply_json_view(value))
1954 .is_some_and(|value| crate::util::is_truthy(&value)),
1955 NdjsonDirectPredicate::ArrayElementViewScalarCall {
1956 source_steps,
1957 element,
1958 suffix_steps,
1959 call,
1960 } => json_tape_path_index(tape, source_steps)
1961 .and_then(|idx| json_tape_array_element(tape, idx, *element))
1962 .and_then(|idx| json_tape_path_index_from(tape, idx, suffix_steps))
1963 .map(|idx| json_tape_scalar(tape, idx))
1964 .and_then(|value| call.try_apply_json_view(value))
1965 .is_some_and(|value| crate::util::is_truthy(&value)),
1966 NdjsonDirectPredicate::ViewPipeline { source_steps, body } => {
1967 let (Some(vm), Some(env)) = (vm.as_deref_mut(), env) else {
1968 return Err(crate::EvalError(
1969 "view pipeline predicate requires VM state".to_string(),
1970 ));
1971 };
1972 let source = json_tape_path_index(tape, source_steps)
1973 .map(|idx| crate::data::view::TapeScratchView::Node { tape, idx })
1974 .unwrap_or(crate::data::view::TapeScratchView::Missing);
1975 crate::exec::view::run_with_env_and_vm(source, body, None, env, vm)
1976 .transpose()?
1977 .is_some_and(|value| crate::util::is_truthy(&value))
1978 }
1979 })
1980}
1981
1982#[cfg(feature = "simd-json")]
1983pub(super) fn predicate_needs_vm(predicate: &NdjsonDirectPredicate) -> bool {
1984 match predicate {
1985 NdjsonDirectPredicate::Not(inner) => predicate_needs_vm(inner),
1986 NdjsonDirectPredicate::Binary { lhs, rhs, .. } => {
1987 predicate_needs_vm(lhs) || predicate_needs_vm(rhs)
1988 }
1989 NdjsonDirectPredicate::ViewPipeline { .. } => true,
1990 NdjsonDirectPredicate::Path(_)
1991 | NdjsonDirectPredicate::Literal(_)
1992 | NdjsonDirectPredicate::ViewScalarCall { .. }
1993 | NdjsonDirectPredicate::ArrayElementViewScalarCall { .. } => false,
1994 }
1995}
1996
1997#[cfg(feature = "simd-json")]
1998fn eval_tape_scalar<'a>(
1999 tape: &'a crate::data::tape::TapeScratch,
2000 predicate: &'a NdjsonDirectPredicate,
2001 cache: &mut NdjsonPathCache,
2002) -> Option<crate::util::JsonView<'a>> {
2003 match predicate {
2004 NdjsonDirectPredicate::Path(steps) => cache
2005 .index(tape, 0, steps)
2006 .map(|idx| json_tape_scalar(tape, idx)),
2007 NdjsonDirectPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2008 _ => None,
2009 }
2010}
2011
2012#[cfg(feature = "simd-json")]
2013fn json_view_truthy(value: crate::util::JsonView<'_>) -> bool {
2014 match value {
2015 crate::util::JsonView::Null => false,
2016 crate::util::JsonView::Bool(value) => value,
2017 crate::util::JsonView::Int(value) => value != 0,
2018 crate::util::JsonView::UInt(value) => value != 0,
2019 crate::util::JsonView::Float(value) => value != 0.0,
2020 crate::util::JsonView::Str(value) => !value.is_empty(),
2021 crate::util::JsonView::ArrayLen(len) | crate::util::JsonView::ObjectLen(len) => len > 0,
2022 }
2023}
2024
2025#[cfg(feature = "simd-json")]
2026fn json_tape_scalar<T: JsonTape>(tape: &T, idx: usize) -> crate::util::JsonView<'_> {
2027 use crate::data::tape::TapeNode;
2028 use simd_json::StaticNode as SN;
2029
2030 let Some(node) = tape.nodes().get(idx).copied() else {
2031 return crate::util::JsonView::Null;
2032 };
2033 match node {
2034 TapeNode::Static(SN::Null) => crate::util::JsonView::Null,
2035 TapeNode::Static(SN::Bool(value)) => crate::util::JsonView::Bool(value),
2036 TapeNode::Static(SN::I64(value)) => crate::util::JsonView::Int(value),
2037 TapeNode::Static(SN::U64(value)) => crate::util::JsonView::UInt(value),
2038 TapeNode::Static(SN::F64(value)) => crate::util::JsonView::Float(value),
2039 TapeNode::String(_) => crate::util::JsonView::Str(tape.str_at(idx)),
2040 TapeNode::Array { len, .. } => crate::util::JsonView::ArrayLen(len),
2041 TapeNode::Object { len, .. } => crate::util::JsonView::ObjectLen(len),
2042 }
2043}
2044
2045pub(super) fn write_val_line<W: Write>(
2046 writer: &mut W,
2047 value: &Val,
2048) -> Result<(), JetroEngineError> {
2049 write_val_json(writer, value)?;
2050 writer.write_all(b"\n")?;
2051 Ok(())
2052}
2053
2054pub(super) fn write_document_line<W: Write>(
2055 writer: &mut W,
2056 document: &Jetro,
2057 line_no: u64,
2058 engine: &JetroEngine,
2059) -> Result<(), JetroEngineError> {
2060 if let Some(bytes) = document.raw_bytes() {
2061 writer.write_all(bytes)?;
2062 writer.write_all(b"\n")?;
2063 return Ok(());
2064 }
2065
2066 let root = document
2067 .root_val_with(engine.keys())
2068 .map_err(|err| row_eval_error(line_no, err))?;
2069 write_val_line(writer, &root)
2070}
2071
2072pub(super) fn ndjson_writer_with_options<W: Write>(
2073 writer: W,
2074 options: NdjsonOptions,
2075) -> BufWriter<W> {
2076 let capacity = options
2077 .reader_buffer_capacity
2078 .max(DEFAULT_READER_BUFFER_CAPACITY);
2079 BufWriter::with_capacity(capacity, writer)
2080}
2081
2082pub(super) fn write_val_json<W: Write>(
2083 writer: &mut W,
2084 value: &Val,
2085) -> Result<(), JetroEngineError> {
2086 match value {
2087 Val::Null => writer.write_all(b"null")?,
2088 Val::Bool(true) => writer.write_all(b"true")?,
2089 Val::Bool(false) => writer.write_all(b"false")?,
2090 Val::Int(n) => write_i64(writer, *n)?,
2091 Val::Float(n) => write_f64(writer, *n)?,
2092 Val::Str(s) => write_json_str(writer, s.as_ref())?,
2093 Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
2094 Val::Arr(items) => write_json_array(writer, items.iter())?,
2095 Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
2096 Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
2097 Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
2098 Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
2099 Val::Obj(entries) => write_json_object(
2100 writer,
2101 entries.iter().map(|(key, value)| (key.as_ref(), value)),
2102 )?,
2103 Val::ObjSmall(entries) => write_json_object(
2104 writer,
2105 entries.iter().map(|(key, value)| (key.as_ref(), value)),
2106 )?,
2107 Val::ObjVec(data) => write_json_objvec(writer, data)?,
2108 }
2109 Ok(())
2110}
2111
2112#[cfg(feature = "simd-json")]
2113fn write_json_tape_at<W: Write, T: JsonTape>(
2114 writer: &mut W,
2115 tape: &T,
2116 idx: usize,
2117) -> Result<usize, JetroEngineError> {
2118 use crate::data::tape::TapeNode;
2119 use simd_json::StaticNode as SN;
2120
2121 let Some(node) = tape.nodes().get(idx).copied() else {
2122 writer.write_all(b"null")?;
2123 return Ok(idx);
2124 };
2125
2126 match node {
2127 TapeNode::Static(SN::Null) => {
2128 writer.write_all(b"null")?;
2129 Ok(idx + 1)
2130 }
2131 TapeNode::Static(SN::Bool(true)) => {
2132 writer.write_all(b"true")?;
2133 Ok(idx + 1)
2134 }
2135 TapeNode::Static(SN::Bool(false)) => {
2136 writer.write_all(b"false")?;
2137 Ok(idx + 1)
2138 }
2139 TapeNode::Static(SN::I64(value)) => {
2140 write_i64(writer, value)?;
2141 Ok(idx + 1)
2142 }
2143 TapeNode::Static(SN::U64(value)) => {
2144 write_u64(writer, value)?;
2145 Ok(idx + 1)
2146 }
2147 TapeNode::Static(SN::F64(value)) => {
2148 write_f64(writer, value)?;
2149 Ok(idx + 1)
2150 }
2151 TapeNode::String(_) => {
2152 write_json_str(writer, tape.str_at(idx))?;
2153 Ok(idx + 1)
2154 }
2155 TapeNode::Array { len, .. } => {
2156 writer.write_all(b"[")?;
2157 let mut cur = idx + 1;
2158 for item_idx in 0..len {
2159 if item_idx > 0 {
2160 writer.write_all(b",")?;
2161 }
2162 cur = write_json_tape_at(writer, tape, cur)?;
2163 }
2164 writer.write_all(b"]")?;
2165 Ok(cur)
2166 }
2167 TapeNode::Object { len, .. } => {
2168 writer.write_all(b"{")?;
2169 let mut cur = idx + 1;
2170 for field_idx in 0..len {
2171 if field_idx > 0 {
2172 writer.write_all(b",")?;
2173 }
2174 write_json_str(writer, tape.str_at(cur))?;
2175 writer.write_all(b":")?;
2176 cur = write_json_tape_at(writer, tape, cur + 1)?;
2177 }
2178 writer.write_all(b"}")?;
2179 Ok(cur)
2180 }
2181 }
2182}
2183
2184#[cfg(feature = "simd-json")]
2185fn visit_json_tape_source_items<T, E, F>(tape: &T, source_idx: usize, mut visit: F) -> Result<(), E>
2186where
2187 T: JsonTape,
2188 F: FnMut(usize) -> Result<(), E>,
2189{
2190 use crate::data::tape::TapeNode;
2191
2192 match tape.nodes().get(source_idx).copied() {
2193 Some(TapeNode::Array { len, .. }) => {
2194 let mut cur = source_idx + 1;
2195 for _ in 0..len {
2196 visit(cur)?;
2197 cur += tape.span(cur);
2198 }
2199 }
2200 Some(_) => visit(source_idx)?,
2201 None => {}
2202 }
2203 Ok(())
2204}
2205
2206#[cfg(feature = "simd-json")]
2207fn write_json_tape_stream<W: Write, T: JsonTape>(
2208 writer: &mut W,
2209 tape: &T,
2210 plan: &NdjsonDirectStreamPlan,
2211 source_cache: &mut NdjsonPathCache,
2212 suffix_cache: &mut NdjsonPathCache,
2213 predicate_cache: &mut NdjsonPathCache,
2214 projection_caches: &mut Vec<NdjsonPathCache>,
2215) -> Result<(), JetroEngineError> {
2216 let Some(source_idx) = source_cache.index(tape, 0, &plan.source_steps) else {
2217 write_json_tape_empty_stream_result(writer, &plan.sink)?;
2218 return Ok(());
2219 };
2220
2221 match &plan.sink {
2222 NdjsonDirectStreamSink::Collect(map) => {
2223 writer.write_all(b"[")?;
2224 let mut wrote_row = false;
2225 visit_json_tape_source_items(tape, source_idx, |item_idx| {
2226 if !plan.predicate.as_ref().is_none_or(|predicate| {
2227 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2228 }) {
2229 return Ok::<(), JetroEngineError>(());
2230 }
2231 if wrote_row {
2232 writer.write_all(b",")?;
2233 }
2234 write_json_tape_stream_map(
2235 writer,
2236 tape,
2237 item_idx,
2238 map,
2239 suffix_cache,
2240 projection_caches,
2241 )?;
2242 wrote_row = true;
2243 Ok(())
2244 })?;
2245 writer.write_all(b"]")?;
2246 }
2247 NdjsonDirectStreamSink::Count => {
2248 let mut count = 0usize;
2249 let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
2250 if plan.predicate.as_ref().is_none_or(|predicate| {
2251 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2252 }) {
2253 count += 1;
2254 }
2255 Ok(())
2256 });
2257 write_i64(writer, count as i64)?;
2258 }
2259 NdjsonDirectStreamSink::Numeric { suffix_steps, op } => {
2260 let caches = NdjsonPathCaches {
2261 source: source_cache,
2262 suffix: suffix_cache,
2263 predicate: predicate_cache,
2264 };
2265 let value = reduce_json_tape_numeric_path(
2266 tape,
2267 &plan.source_steps,
2268 plan.predicate.as_ref(),
2269 suffix_steps,
2270 *op,
2271 caches,
2272 );
2273 write_val_json(writer, &value)?;
2274 }
2275 }
2276
2277 Ok(())
2278}
2279
2280#[cfg(feature = "simd-json")]
2281fn write_json_tape_empty_stream_result<W: Write>(
2282 writer: &mut W,
2283 sink: &NdjsonDirectStreamSink,
2284) -> Result<(), JetroEngineError> {
2285 match sink {
2286 NdjsonDirectStreamSink::Collect(_) => writer.write_all(b"[]")?,
2287 NdjsonDirectStreamSink::Count => writer.write_all(b"0")?,
2288 NdjsonDirectStreamSink::Numeric { op, .. } => {
2289 let value = crate::exec::pipeline::num_finalise(
2290 *op,
2291 0,
2292 0.0,
2293 false,
2294 f64::INFINITY,
2295 f64::NEG_INFINITY,
2296 0,
2297 );
2298 write_val_json(writer, &value)?;
2299 }
2300 }
2301 Ok(())
2302}
2303
2304#[cfg(feature = "simd-json")]
2305fn write_json_tape_stream_map<W: Write, T: JsonTape>(
2306 writer: &mut W,
2307 tape: &T,
2308 item_idx: usize,
2309 map: &NdjsonDirectStreamMap,
2310 suffix_cache: &mut NdjsonPathCache,
2311 projection_caches: &mut Vec<NdjsonPathCache>,
2312) -> Result<(), JetroEngineError> {
2313 match map {
2314 NdjsonDirectStreamMap::Value(value) => {
2315 let path_idx = match value {
2316 NdjsonDirectProjectionValue::Path(steps)
2317 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2318 suffix_cache.index(tape, item_idx, steps)
2319 }
2320 NdjsonDirectProjectionValue::Literal(_) => None,
2321 };
2322 write_json_tape_direct_value(writer, tape, value, path_idx)?;
2323 }
2324 NdjsonDirectStreamMap::Array(items) => {
2325 write_json_tape_array_projection_from(
2326 writer,
2327 tape,
2328 item_idx,
2329 items,
2330 projection_caches,
2331 )?;
2332 }
2333 NdjsonDirectStreamMap::Object(fields) => {
2334 write_json_tape_object_projection_from(
2335 writer,
2336 tape,
2337 item_idx,
2338 fields,
2339 projection_caches,
2340 )?;
2341 }
2342 }
2343 Ok(())
2344}
2345
2346#[cfg(feature = "simd-json")]
2347fn write_json_tape_object_projection<W: Write, T: JsonTape>(
2348 writer: &mut W,
2349 tape: &T,
2350 fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2351 path_caches: &mut Vec<NdjsonPathCache>,
2352) -> Result<(), JetroEngineError> {
2353 write_json_tape_object_projection_from(writer, tape, 0, fields, path_caches)
2354}
2355
2356#[cfg(feature = "simd-json")]
2357fn write_json_tape_object_projection_from<W: Write, T: JsonTape>(
2358 writer: &mut W,
2359 tape: &T,
2360 start: usize,
2361 fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2362 path_caches: &mut Vec<NdjsonPathCache>,
2363) -> Result<(), JetroEngineError> {
2364 if path_caches.len() < fields.len() {
2365 path_caches.resize_with(fields.len(), NdjsonPathCache::default);
2366 }
2367 writer.write_all(b"{")?;
2368 let mut wrote = false;
2369 for (field_idx, field) in fields.iter().enumerate() {
2370 let path_cache = &mut path_caches[field_idx];
2371 let mut path_idx = None;
2372 match &field.value {
2373 NdjsonDirectProjectionValue::Path(steps) => {
2374 let idx = path_cache.index(tape, start, steps);
2375 path_idx = idx;
2376 if field.optional
2377 && idx
2378 .map(|idx| {
2379 matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
2380 })
2381 .unwrap_or(true)
2382 {
2383 continue;
2384 }
2385 }
2386 NdjsonDirectProjectionValue::ViewScalarCall {
2387 steps,
2388 call,
2389 optional,
2390 } => {
2391 let idx = path_cache.index(tape, start, steps);
2392 path_idx = idx;
2393 if (*optional || field.optional)
2394 && idx
2395 .map(|idx| {
2396 matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
2397 })
2398 .unwrap_or(true)
2399 {
2400 continue;
2401 }
2402 if field.optional
2403 && idx
2404 .map(|idx| json_tape_scalar(tape, idx))
2405 .and_then(|value| call.try_apply_json_view(value))
2406 .is_some_and(|value| matches!(value, Val::Null))
2407 {
2408 continue;
2409 }
2410 }
2411 NdjsonDirectProjectionValue::Literal(Val::Null) if field.optional => {
2412 continue;
2413 }
2414 NdjsonDirectProjectionValue::Literal(_) => {}
2415 }
2416 if wrote {
2417 writer.write_all(b",")?;
2418 }
2419 write_json_str(writer, field.key.as_ref())?;
2420 writer.write_all(b":")?;
2421 write_json_tape_direct_value(writer, tape, &field.value, path_idx)?;
2422 wrote = true;
2423 }
2424 writer.write_all(b"}")?;
2425 Ok(())
2426}
2427
2428#[cfg(feature = "simd-json")]
2429fn write_json_tape_array_projection<W: Write, T: JsonTape>(
2430 writer: &mut W,
2431 tape: &T,
2432 items: &[NdjsonDirectProjectionValue],
2433 path_caches: &mut Vec<NdjsonPathCache>,
2434) -> Result<(), JetroEngineError> {
2435 write_json_tape_array_projection_from(writer, tape, 0, items, path_caches)
2436}
2437
2438#[cfg(feature = "simd-json")]
2439fn write_json_tape_array_projection_from<W: Write, T: JsonTape>(
2440 writer: &mut W,
2441 tape: &T,
2442 start: usize,
2443 items: &[NdjsonDirectProjectionValue],
2444 path_caches: &mut Vec<NdjsonPathCache>,
2445) -> Result<(), JetroEngineError> {
2446 if path_caches.len() < items.len() {
2447 path_caches.resize_with(items.len(), NdjsonPathCache::default);
2448 }
2449 writer.write_all(b"[")?;
2450 for (idx, item) in items.iter().enumerate() {
2451 if idx > 0 {
2452 writer.write_all(b",")?;
2453 }
2454 let path_idx = match item {
2455 NdjsonDirectProjectionValue::Path(steps)
2456 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2457 path_caches[idx].index(tape, start, steps)
2458 }
2459 NdjsonDirectProjectionValue::Literal(_) => None,
2460 };
2461 write_json_tape_direct_value(writer, tape, item, path_idx)?;
2462 }
2463 writer.write_all(b"]")?;
2464 Ok(())
2465}
2466
2467#[cfg(feature = "simd-json")]
2468fn write_json_tape_direct_value<W: Write, T: JsonTape>(
2469 writer: &mut W,
2470 tape: &T,
2471 value: &NdjsonDirectProjectionValue,
2472 path_idx: Option<usize>,
2473) -> Result<(), JetroEngineError> {
2474 match value {
2475 NdjsonDirectProjectionValue::Path(_) => {
2476 if let Some(idx) = path_idx {
2477 write_json_tape_at(writer, tape, idx)?;
2478 } else {
2479 writer.write_all(b"null")?;
2480 }
2481 }
2482 NdjsonDirectProjectionValue::ViewScalarCall { call, .. } => {
2483 if let Some(idx) = path_idx {
2484 let value = json_tape_scalar(tape, idx);
2485 if let Some(value) = call.try_apply_json_view(value) {
2486 write_val_json(writer, &value)?;
2487 } else {
2488 write_json_tape_at(writer, tape, idx)?;
2489 }
2490 } else {
2491 writer.write_all(b"null")?;
2492 }
2493 }
2494 NdjsonDirectProjectionValue::Literal(value) => write_val_json(writer, value)?,
2495 }
2496 Ok(())
2497}
2498
2499#[cfg(feature = "simd-json")]
2500fn write_json_tape_object_items<W: Write, T: JsonTape>(
2501 writer: &mut W,
2502 tape: &T,
2503 obj_idx: Option<usize>,
2504 method: crate::builtins::BuiltinMethod,
2505) -> Result<(), JetroEngineError> {
2506 let Some(obj_idx) = obj_idx else {
2507 writer.write_all(b"[]")?;
2508 return Ok(());
2509 };
2510 let Some(crate::data::tape::TapeNode::Object { len, .. }) = tape.nodes().get(obj_idx).copied()
2511 else {
2512 writer.write_all(b"[]")?;
2513 return Ok(());
2514 };
2515
2516 writer.write_all(b"[")?;
2517 let mut cur = obj_idx + 1;
2518 for field_idx in 0..len {
2519 if field_idx > 0 {
2520 writer.write_all(b",")?;
2521 }
2522 match method {
2523 crate::builtins::BuiltinMethod::Keys => {
2524 write_json_str(writer, tape.str_at(cur))?;
2525 cur += 1;
2526 cur += tape.span(cur);
2527 }
2528 crate::builtins::BuiltinMethod::Values => {
2529 cur = write_json_tape_at(writer, tape, cur + 1)?;
2530 }
2531 crate::builtins::BuiltinMethod::Entries => {
2532 writer.write_all(b"[")?;
2533 write_json_str(writer, tape.str_at(cur))?;
2534 writer.write_all(b",")?;
2535 cur = write_json_tape_at(writer, tape, cur + 1)?;
2536 writer.write_all(b"]")?;
2537 }
2538 _ => unreachable!("non-object-items builtin"),
2539 }
2540 }
2541 writer.write_all(b"]")?;
2542 Ok(())
2543}
2544
2545#[cfg(feature = "simd-json")]
2546fn reduce_json_tape_numeric_path<T: JsonTape>(
2547 tape: &T,
2548 source_steps: &[crate::ir::physical::PhysicalPathStep],
2549 predicate: Option<&NdjsonDirectItemPredicate>,
2550 suffix_steps: &[crate::ir::physical::PhysicalPathStep],
2551 op: crate::exec::pipeline::NumOp,
2552 caches: NdjsonPathCaches<'_>,
2553) -> Val {
2554 let mut acc_i = 0i64;
2555 let mut acc_f = 0.0f64;
2556 let mut floated = false;
2557 let mut min_f = f64::INFINITY;
2558 let mut max_f = f64::NEG_INFINITY;
2559 let mut n_obs = 0usize;
2560
2561 let Some(source_idx) = caches.source.index(tape, 0, source_steps) else {
2562 return crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs);
2563 };
2564
2565 let suffix_cache = caches.suffix;
2566 let predicate_cache = caches.predicate;
2567 let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
2568 if !predicate.is_none_or(|predicate| {
2569 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2570 }) {
2571 return Ok(());
2572 }
2573 if let Some(idx) = suffix_cache.index(tape, item_idx, suffix_steps) {
2574 fold_json_tape_numeric(
2575 json_tape_scalar(tape, idx),
2576 op,
2577 &mut acc_i,
2578 &mut acc_f,
2579 &mut floated,
2580 &mut min_f,
2581 &mut max_f,
2582 &mut n_obs,
2583 );
2584 }
2585 Ok(())
2586 });
2587
2588 crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs)
2589}
2590
2591#[cfg(feature = "simd-json")]
2592#[allow(clippy::too_many_arguments)]
2593fn fold_json_tape_numeric(
2594 value: crate::util::JsonView<'_>,
2595 op: crate::exec::pipeline::NumOp,
2596 acc_i: &mut i64,
2597 acc_f: &mut f64,
2598 floated: &mut bool,
2599 min_f: &mut f64,
2600 max_f: &mut f64,
2601 n_obs: &mut usize,
2602) {
2603 match value {
2604 crate::util::JsonView::Int(value) => crate::exec::pipeline::num_fold_i64(
2605 acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
2606 ),
2607 crate::util::JsonView::UInt(value) if value <= i64::MAX as u64 => {
2608 crate::exec::pipeline::num_fold_i64(
2609 acc_i,
2610 acc_f,
2611 floated,
2612 min_f,
2613 max_f,
2614 n_obs,
2615 op,
2616 value as i64,
2617 )
2618 }
2619 crate::util::JsonView::UInt(value) => crate::exec::pipeline::num_fold_f64(
2620 acc_i,
2621 acc_f,
2622 floated,
2623 min_f,
2624 max_f,
2625 n_obs,
2626 op,
2627 value as f64,
2628 ),
2629 crate::util::JsonView::Float(value) => crate::exec::pipeline::num_fold_f64(
2630 acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
2631 ),
2632 _ => {}
2633 }
2634}
2635
2636#[cfg(feature = "simd-json")]
2637fn eval_json_tape_item_predicate_cached<T: JsonTape>(
2638 tape: &T,
2639 item_idx: usize,
2640 predicate: &NdjsonDirectItemPredicate,
2641 cache: &mut NdjsonPathCache,
2642) -> bool {
2643 use crate::parse::ast::BinOp;
2644
2645 match predicate {
2646 NdjsonDirectItemPredicate::Path(steps) => cache
2647 .index(tape, item_idx, steps)
2648 .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
2649 .unwrap_or(false),
2650 NdjsonDirectItemPredicate::Literal(value) => crate::util::is_truthy(value),
2651 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
2652 eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
2653 && eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
2654 }
2655 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
2656 eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
2657 || eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
2658 }
2659 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } => {
2660 let Some(lhs) = eval_json_tape_item_scalar_cached(tape, item_idx, lhs, cache) else {
2661 return false;
2662 };
2663 let Some(rhs) = eval_json_tape_item_scalar_cached(tape, item_idx, rhs, cache) else {
2664 return false;
2665 };
2666 crate::util::json_cmp_binop(lhs, *op, rhs)
2667 }
2668 NdjsonDirectItemPredicate::CmpLit { lhs, op, lit } => cache
2669 .index(tape, item_idx, lhs)
2670 .map(|idx| json_tape_scalar(tape, idx))
2671 .is_some_and(|value| {
2672 crate::util::json_cmp_binop(value, *op, crate::util::JsonView::from_val(lit))
2673 }),
2674 NdjsonDirectItemPredicate::ViewScalarCall { suffix_steps, call } => cache
2675 .index(tape, item_idx, suffix_steps)
2676 .map(|idx| json_tape_scalar(tape, idx))
2677 .and_then(|value| call.try_apply_json_view(value))
2678 .is_some_and(|value| crate::util::is_truthy(&value)),
2679 }
2680}
2681
2682#[cfg(feature = "simd-json")]
2683fn eval_json_tape_item_scalar_cached<'a, T: JsonTape>(
2684 tape: &'a T,
2685 item_idx: usize,
2686 predicate: &'a NdjsonDirectItemPredicate,
2687 cache: &mut NdjsonPathCache,
2688) -> Option<crate::util::JsonView<'a>> {
2689 match predicate {
2690 NdjsonDirectItemPredicate::Path(steps) => cache
2691 .index(tape, item_idx, steps)
2692 .map(|idx| json_tape_scalar(tape, idx)),
2693 NdjsonDirectItemPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2694 _ => None,
2695 }
2696}
2697
2698fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2699where
2700 W: Write,
2701 I: IntoIterator<Item = &'a Val>,
2702{
2703 writer.write_all(b"[")?;
2704 let mut first = true;
2705 for item in items {
2706 if first {
2707 first = false;
2708 } else {
2709 writer.write_all(b",")?;
2710 }
2711 write_val_json(writer, item)?;
2712 }
2713 writer.write_all(b"]")?;
2714 Ok(())
2715}
2716
2717fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2718where
2719 W: Write,
2720 I: IntoIterator<Item = i64>,
2721{
2722 writer.write_all(b"[")?;
2723 let mut first = true;
2724 let mut buf = itoa::Buffer::new();
2725 for item in items {
2726 if first {
2727 first = false;
2728 } else {
2729 writer.write_all(b",")?;
2730 }
2731 writer.write_all(buf.format(item).as_bytes())?;
2732 }
2733 writer.write_all(b"]")?;
2734 Ok(())
2735}
2736
2737fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2738where
2739 W: Write,
2740 I: IntoIterator<Item = f64>,
2741{
2742 writer.write_all(b"[")?;
2743 let mut first = true;
2744 let mut buf = ryu::Buffer::new();
2745 for item in items {
2746 if first {
2747 first = false;
2748 } else {
2749 writer.write_all(b",")?;
2750 }
2751 if item.is_finite() {
2752 writer.write_all(buf.format(item).as_bytes())?;
2753 } else {
2754 writer.write_all(b"0")?;
2755 }
2756 }
2757 writer.write_all(b"]")?;
2758 Ok(())
2759}
2760
2761fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2762where
2763 W: Write,
2764 I: IntoIterator<Item = &'a str>,
2765{
2766 writer.write_all(b"[")?;
2767 let mut first = true;
2768 for item in items {
2769 if first {
2770 first = false;
2771 } else {
2772 writer.write_all(b",")?;
2773 }
2774 write_json_str(writer, item)?;
2775 }
2776 writer.write_all(b"]")?;
2777 Ok(())
2778}
2779
2780fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
2781where
2782 W: Write,
2783 I: IntoIterator<Item = (&'a str, &'a Val)>,
2784{
2785 writer.write_all(b"{")?;
2786 let mut first = true;
2787 for (key, value) in entries {
2788 if first {
2789 first = false;
2790 } else {
2791 writer.write_all(b",")?;
2792 }
2793 write_json_str(writer, key)?;
2794 writer.write_all(b":")?;
2795 write_val_json(writer, value)?;
2796 }
2797 writer.write_all(b"}")?;
2798 Ok(())
2799}
2800
2801fn write_json_objvec<W: Write>(
2802 writer: &mut W,
2803 data: &crate::data::value::ObjVecData,
2804) -> Result<(), JetroEngineError> {
2805 writer.write_all(b"[")?;
2806 for row in 0..data.nrows() {
2807 if row > 0 {
2808 writer.write_all(b",")?;
2809 }
2810 writer.write_all(b"{")?;
2811 for slot in 0..data.stride() {
2812 if slot > 0 {
2813 writer.write_all(b",")?;
2814 }
2815 write_json_str(writer, data.keys[slot].as_ref())?;
2816 writer.write_all(b":")?;
2817 write_val_json(writer, data.cell(row, slot))?;
2818 }
2819 writer.write_all(b"}")?;
2820 }
2821 writer.write_all(b"]")?;
2822 Ok(())
2823}
2824
2825fn write_json_str<W: Write>(writer: &mut W, value: &str) -> Result<(), JetroEngineError> {
2826 writer.write_all(b"\"")?;
2827 let bytes = value.as_bytes();
2828 if !needs_json_escape(bytes) {
2829 writer.write_all(bytes)?;
2830 writer.write_all(b"\"")?;
2831 return Ok(());
2832 }
2833
2834 let mut start = 0usize;
2835
2836 for (idx, &byte) in bytes.iter().enumerate() {
2837 let escaped = match byte {
2838 b'"' => Some(br#"\""#.as_slice()),
2839 b'\\' => Some(br#"\\"#.as_slice()),
2840 b'\n' => Some(br#"\n"#.as_slice()),
2841 b'\r' => Some(br#"\r"#.as_slice()),
2842 b'\t' => Some(br#"\t"#.as_slice()),
2843 0x08 => Some(br#"\b"#.as_slice()),
2844 0x0c => Some(br#"\f"#.as_slice()),
2845 0x00..=0x1f => None,
2846 _ => continue,
2847 };
2848
2849 if start < idx {
2850 writer.write_all(&bytes[start..idx])?;
2851 }
2852 match escaped {
2853 Some(seq) => writer.write_all(seq)?,
2854 None => write_control_escape(writer, byte)?,
2855 }
2856 start = idx + 1;
2857 }
2858
2859 if start < bytes.len() {
2860 writer.write_all(&bytes[start..])?;
2861 }
2862 writer.write_all(b"\"")?;
2863 Ok(())
2864}
2865
2866#[inline]
2867pub(super) fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
2868 let mut buf = itoa::Buffer::new();
2869 writer.write_all(buf.format(value).as_bytes())?;
2870 Ok(())
2871}
2872
2873#[inline]
2874fn write_u64<W: Write>(writer: &mut W, value: u64) -> Result<(), JetroEngineError> {
2875 let mut buf = itoa::Buffer::new();
2876 writer.write_all(buf.format(value).as_bytes())?;
2877 Ok(())
2878}
2879
2880#[inline]
2881fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
2882 if value.is_finite() {
2883 let mut buf = ryu::Buffer::new();
2884 writer.write_all(buf.format(value).as_bytes())?;
2885 } else {
2886 writer.write_all(b"0")?;
2887 }
2888 Ok(())
2889}
2890
2891#[inline]
2892fn needs_json_escape(bytes: &[u8]) -> bool {
2893 bytes
2894 .iter()
2895 .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
2896}
2897
2898fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
2899 const HEX: &[u8; 16] = b"0123456789abcdef";
2900 writer.write_all(&[
2901 b'\\',
2902 b'u',
2903 b'0',
2904 b'0',
2905 HEX[(byte >> 4) as usize],
2906 HEX[(byte & 0x0f) as usize],
2907 ])?;
2908 Ok(())
2909}
2910
2911pub(super) fn collect_row_val(
2912 engine: &JetroEngine,
2913 document: &Jetro,
2914 plan: &crate::ir::physical::QueryPlan,
2915 line_no: u64,
2916) -> Result<Val, JetroEngineError> {
2917 engine
2918 .collect_prepared_val(document, plan)
2919 .map_err(|err| row_eval_error(line_no, err))
2920}
2921
2922pub(super) fn parse_row(
2923 engine: &JetroEngine,
2924 line_no: u64,
2925 row: Vec<u8>,
2926) -> Result<Jetro, JetroEngineError> {
2927 engine
2928 .parse_bytes_lazy(row)
2929 .map_err(|err| row_parse_error(line_no, err))
2930}
2931
2932pub(super) fn row_parse_error(line_no: u64, err: JetroEngineError) -> JetroEngineError {
2933 match err {
2934 JetroEngineError::Json(source) => RowError::InvalidJson { line_no, source }.into(),
2935 JetroEngineError::Eval(eval) => RowError::InvalidJsonMessage {
2936 line_no,
2937 message: eval.to_string(),
2938 }
2939 .into(),
2940 other => other,
2941 }
2942}
2943
2944pub(super) fn row_eval_error(line_no: u64, err: crate::EvalError) -> JetroEngineError {
2945 let message = err.0;
2946 if message.starts_with("Invalid JSON:") {
2947 RowError::InvalidJsonMessage { line_no, message }.into()
2948 } else {
2949 crate::EvalError(message).into()
2950 }
2951}
2952
2953fn trim_line_ending(buf: &mut Vec<u8>) {
2954 while matches!(buf.last(), Some(b'\n' | b'\r')) {
2955 buf.pop();
2956 }
2957}
2958
2959fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
2960 if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
2961 buf.drain(..3);
2962 }
2963}
2964
2965fn non_ws_range(buf: &[u8]) -> (usize, usize) {
2966 let start = buf
2967 .iter()
2968 .position(|b| !b.is_ascii_whitespace())
2969 .unwrap_or(buf.len());
2970 let end = buf
2971 .iter()
2972 .rposition(|b| !b.is_ascii_whitespace())
2973 .map(|idx| idx + 1)
2974 .unwrap_or(start);
2975 (start, end)
2976}
2977
2978#[cfg(test)]
2979mod tests {
2980 #[test]
2981 #[cfg(feature = "simd-json")]
2982 fn parse_row_keeps_simd_document_lazy() {
2983 let engine = crate::JetroEngine::new();
2984 let row = br#"{"name":"Ada","age":30}"#.to_vec();
2985
2986 let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
2987
2988 assert!(!document.root_val_is_materialized());
2989 assert!(!document.tape_is_built());
2990 }
2991
2992 #[test]
2993 fn owned_row_read_preserves_reusable_buffer_capacity() {
2994 let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
2995 let mut driver = super::NdjsonPerRowDriver::new(input);
2996 let mut buf = Vec::with_capacity(128);
2997
2998 let first = driver
2999 .read_next_owned(&mut buf)
3000 .expect("row read succeeds")
3001 .expect("first row exists");
3002 assert_eq!(first.1, br#"{"n":1}"#);
3003 assert_eq!(buf.capacity(), 128);
3004
3005 let second = driver
3006 .read_next_owned(&mut buf)
3007 .expect("row read succeeds")
3008 .expect("second row exists");
3009 assert_eq!(second.1, br#"{"n":2}"#);
3010 assert_eq!(buf.capacity(), 128);
3011 }
3012
3013 #[test]
3014 #[cfg(feature = "simd-json")]
3015 fn direct_tape_plan_accepts_first_suffix() {
3016 let engine = crate::JetroEngine::new();
3017 for query in [
3018 "attributes.first().value",
3019 "attributes.last().value",
3020 "attributes.nth(1).value",
3021 ] {
3022 let plan =
3023 super::direct_tape_plan(&engine, query).expect("array suffix should be direct");
3024 assert!(matches!(
3025 plan,
3026 super::NdjsonDirectTapePlan::ArrayElementPath { .. }
3027 ));
3028 }
3029 }
3030
3031 #[test]
3032 #[cfg(feature = "simd-json")]
3033 fn direct_tape_plan_accepts_rooted_bench_shapes() {
3034 let engine = crate::JetroEngine::new();
3035 for query in [
3036 "$.id",
3037 "$.a.b.c",
3038 "$.meta.id",
3039 "$.name",
3040 "$.attributes.len()",
3041 "$.store.attributes.len()",
3042 "$.attributes.map(@.key)",
3043 "$.attributes.first().value",
3044 "$.store.attributes.first().value",
3045 "$.attributes.last().value",
3046 "$.name.upper()",
3047 "$.store.name.upper()",
3048 "$.attributes.map([@.key, @.value])",
3049 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3050 "$.keys()",
3051 ] {
3052 super::direct_tape_plan(&engine, query)
3053 .unwrap_or_else(|| panic!("{query} should have a direct NDJSON tape plan"));
3054 }
3055 }
3056
3057 #[test]
3058 #[cfg(feature = "simd-json")]
3059 fn direct_tape_plan_lowers_stream_shapes_generically() {
3060 let engine = crate::JetroEngine::new();
3061 for query in [
3062 "$.attributes.map(@.key)",
3063 "$.attributes.map(@.key.upper())",
3064 r#"$.attributes.filter(@.value.contains("_3")).map(@.key)"#,
3065 r#"$.attributes.filter(@.value.contains("_3")).map(@.key.upper())"#,
3066 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3067 "$.attributes.map(@.weight).sum()",
3068 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3069 ] {
3070 let plan =
3071 super::direct_tape_plan(&engine, query).expect("query should be direct NDJSON");
3072 assert!(
3073 matches!(plan, super::NdjsonDirectTapePlan::Stream(_)),
3074 "{query} should lower to a generic NDJSON stream plan"
3075 );
3076 }
3077 }
3078
3079 #[test]
3080 #[cfg(feature = "simd-json")]
3081 fn direct_byte_plan_accepts_fast_root_shapes() {
3082 let engine = crate::JetroEngine::new();
3083 for query in [
3084 "$.id",
3085 "$.name",
3086 "$.name.upper()",
3087 "$.name.lower()",
3088 "$.keys()",
3089 "$.meta.keys()",
3090 "$.values()",
3091 "$.entries()",
3092 "$.attributes.first().value",
3093 "$.store.attributes.first().value",
3094 "$.attributes.first().key.upper()",
3095 "$.attributes.last().value",
3096 "$.attributes.nth(1).value",
3097 ] {
3098 super::direct_byte_plan(&engine, query)
3099 .unwrap_or_else(|| panic!("{query} should have a direct NDJSON byte plan"));
3100 }
3101 }
3102
3103 #[test]
3104 #[cfg(feature = "simd-json")]
3105 fn direct_byte_predicates_cover_match_shapes() {
3106 let engine = crate::JetroEngine::new();
3107 let row = br#"{"active":true,"score":9910,"attributes":[{"key":"k1","value":"v_1"}]}"#;
3108 for predicate in [
3109 ("active", true),
3110 ("score > 9900", true),
3111 ("score < 100", false),
3112 (r#"attributes.first().value.contains("_1")"#, true),
3113 ] {
3114 let plan = super::direct_tape_predicate(&engine, predicate.0)
3115 .unwrap_or_else(|| panic!("{} should have a direct predicate", predicate.0));
3116 let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
3117 .expect("byte predicate should evaluate")
3118 .unwrap_or_else(|| panic!("{} should not need tape fallback", predicate.0));
3119 assert_eq!(matched, predicate.1, "{}", predicate.0);
3120 }
3121 }
3122
3123 #[test]
3124 #[cfg(feature = "simd-json")]
3125 fn direct_byte_tape_plan_counts_filtered_rows() {
3126 let engine = crate::JetroEngine::new();
3127 let query = r#"attributes.filter(@.value.contains("_3")).len()"#;
3128 let plan = super::direct_tape_plan(&engine, query).expect("filter count should be direct");
3129 assert!(super::tape_plan_can_write_byte_row(&plan));
3130
3131 let row = br#"{"attributes":[{"value":"a_3"},{"value":"b"},{"value":"c_3"}]}"#;
3132 let mut out = Vec::new();
3133 let mut scratch = Vec::new();
3134 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
3135 .expect("byte count should write");
3136 assert!(matches!(wrote, super::BytePlanWrite::Done));
3137 assert_eq!(out, b"2");
3138 }
3139
3140 #[test]
3141 #[cfg(feature = "simd-json")]
3142 fn direct_byte_tape_plan_collects_stream_maps() {
3143 let engine = crate::JetroEngine::new();
3144 let row = br#"{"attributes":[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]}"#;
3145 for (query, expected) in [
3146 ("attributes.map(@.key)", r#"["k1","k2"]"#),
3147 (
3148 "attributes.map([@.key, @.value])",
3149 r#"[["k1","v1"],["k2","v2"]]"#,
3150 ),
3151 (
3152 "attributes.map({key: @.key, value: @.value})",
3153 r#"[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]"#,
3154 ),
3155 ("attributes.map(@.key.upper())", r#"["K1","K2"]"#),
3156 (
3157 r#"attributes.filter(@.value.contains("2")).map(@.key)"#,
3158 r#"["k2"]"#,
3159 ),
3160 ] {
3161 let plan = super::direct_tape_plan(&engine, query)
3162 .unwrap_or_else(|| panic!("{query} should be direct"));
3163 assert!(
3164 super::tape_plan_can_write_byte_row(&plan),
3165 "{query} should be byte-writable"
3166 );
3167 let mut out = Vec::new();
3168 let mut scratch = Vec::new();
3169 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
3170 .expect("byte stream should write");
3171 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
3172 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
3173 }
3174 }
3175
3176 #[test]
3177 #[cfg(feature = "simd-json")]
3178 fn direct_byte_tape_plan_writes_static_projections() {
3179 let engine = crate::JetroEngine::new();
3180 let row = br#"{"id":7,"a":{"b":{"c":1}}}"#;
3181 for (query, expected) in [
3182 ("$.a.b.c", "1"),
3183 (r#"{test: $.a.b.c, b: $.a.b}"#, r#"{"test":1,"b":{"c":1}}"#),
3184 (r#"[$.a.b.c, $.id]"#, r#"[1,7]"#),
3185 ] {
3186 let plan = super::direct_tape_plan(&engine, query)
3187 .unwrap_or_else(|| panic!("{query} should be direct"));
3188 assert!(
3189 super::tape_plan_can_write_byte_row(&plan),
3190 "{query} should be byte-writable"
3191 );
3192 let mut out = Vec::new();
3193 let mut scratch = Vec::new();
3194 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
3195 .expect("byte projection should write");
3196 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
3197 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
3198 }
3199 }
3200
3201 #[test]
3202 #[cfg(feature = "simd-json")]
3203 fn run_ndjson_uses_byte_paths_for_nested_object_items() {
3204 let engine = crate::JetroEngine::new();
3205 let rows = std::io::Cursor::new(
3206 br#"{"id":1}
3207{"id":2}
3208"#,
3209 );
3210 let mut out = Vec::new();
3211 engine
3212 .run_ndjson(rows, "$.id", &mut out)
3213 .expect("rooted byte path should run");
3214 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
3215
3216 let rows = std::io::Cursor::new(
3217 br#"{"meta":{"id":1,"kind":"a"}}
3218{"meta":{"id":2,"kind":"b"}}
3219"#,
3220 );
3221
3222 let mut out = Vec::new();
3223 engine
3224 .run_ndjson(rows, "$.meta.id", &mut out)
3225 .expect("nested byte path should run");
3226 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
3227
3228 let rows = std::io::Cursor::new(br#"{"meta":{"id":1,"kind":"a"}}"#);
3229 let mut out = Vec::new();
3230 engine
3231 .run_ndjson(rows, "$.meta.keys()", &mut out)
3232 .expect("nested byte object items should run");
3233 assert_eq!(std::str::from_utf8(&out).unwrap(), "[\"id\",\"kind\"]\n");
3234 }
3235
3236 #[test]
3237 #[cfg(feature = "simd-json")]
3238 fn run_ndjson_uses_byte_paths_for_nested_array_demands() {
3239 let engine = crate::JetroEngine::new();
3240 let rows = std::io::Cursor::new(
3241 br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}]}}
3242{"store":{"attributes":[{"value":"c"},{"value":"d"}]}}
3243"#,
3244 );
3245
3246 let mut out = Vec::new();
3247 engine
3248 .run_ndjson(rows, "$.store.attributes.first().value", &mut out)
3249 .expect("nested byte array demand should run");
3250 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"a\"\n\"c\"\n");
3251 }
3252}