1use super::{NdjsonSource, RowError};
2use crate::data::value::Val;
3use crate::plan::physical::PlanningContext;
4use crate::util::is_truthy;
5use crate::{Jetro, JetroEngine, JetroEngineError, VM};
6use memchr::memchr;
7use serde_json::Value;
8use std::fs::File;
9use std::io::{BufRead, BufWriter, Write};
10use std::path::Path;
11use std::sync::MutexGuard;
12
13const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
14const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
15const DEFAULT_READER_BUFFER_CAPACITY: usize = 64 * 1024;
16pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub struct NdjsonOptions {
21 pub max_line_len: usize,
22 pub initial_buffer_capacity: usize,
23 pub reader_buffer_capacity: usize,
24 pub reverse_chunk_size: usize,
25}
26
27impl Default for NdjsonOptions {
28 fn default() -> Self {
29 Self {
30 max_line_len: DEFAULT_MAX_LINE_LEN,
31 initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
32 reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
33 reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
34 }
35 }
36}
37
38impl NdjsonOptions {
39 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
40 self.max_line_len = max_line_len;
41 self
42 }
43
44 pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
45 self.initial_buffer_capacity = capacity;
46 self
47 }
48
49 pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
50 self.reader_buffer_capacity = capacity;
51 self
52 }
53
54 pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
55 self.reverse_chunk_size = capacity;
56 self
57 }
58}
59
60pub struct NdjsonPerRowDriver<R> {
62 reader: R,
63 line_no: u64,
64 max_line_len: usize,
65}
66
67impl<R: BufRead> NdjsonPerRowDriver<R> {
68 pub fn new(reader: R) -> Self {
69 Self {
70 reader,
71 line_no: 0,
72 max_line_len: DEFAULT_MAX_LINE_LEN,
73 }
74 }
75
76 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
77 self.max_line_len = max_line_len;
78 self
79 }
80
81 pub fn line_no(&self) -> u64 {
82 self.line_no
83 }
84
85 pub fn read_next_nonempty<'a>(
88 &mut self,
89 buf: &'a mut Vec<u8>,
90 ) -> Result<Option<(u64, &'a [u8])>, RowError> {
91 loop {
92 buf.clear();
93 let read = self.read_physical_line(buf)?;
94 if read == 0 {
95 return Ok(None);
96 }
97 self.line_no += 1;
98
99 strip_initial_bom(self.line_no, buf);
100 trim_line_ending(buf);
101
102 let (start, end) = non_ws_range(buf);
103 if start == end {
104 continue;
105 }
106
107 let len = end - start;
108 if len > self.max_line_len {
109 return Err(RowError::LineTooLarge {
110 line_no: self.line_no,
111 len,
112 max: self.max_line_len,
113 });
114 }
115
116 return Ok(Some((self.line_no, &buf[start..end])));
117 }
118 }
119
120 pub fn read_next_owned(
124 &mut self,
125 buf: &mut Vec<u8>,
126 ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
127 loop {
128 buf.clear();
129 let read = self.read_physical_line(buf)?;
130 if read == 0 {
131 return Ok(None);
132 }
133 self.line_no += 1;
134
135 strip_initial_bom(self.line_no, buf);
136 trim_line_ending(buf);
137
138 let (start, end) = non_ws_range(buf);
139 if start == end {
140 continue;
141 }
142
143 let len = end - start;
144 if len > self.max_line_len {
145 return Err(RowError::LineTooLarge {
146 line_no: self.line_no,
147 len,
148 max: self.max_line_len,
149 });
150 }
151
152 let capacity = buf.capacity();
153 return Ok(Some((
154 self.line_no,
155 std::mem::replace(buf, Vec::with_capacity(capacity)),
156 )));
157 }
158 }
159
160 fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
161 loop {
162 let available = self.reader.fill_buf()?;
163 if available.is_empty() {
164 return Ok(buf.len());
165 }
166
167 if let Some(pos) = memchr(b'\n', available) {
168 buf.extend_from_slice(&available[..=pos]);
169 self.reader.consume(pos + 1);
170 self.check_physical_line_len(buf.len())?;
171 return Ok(buf.len());
172 }
173
174 let len = available.len();
175 buf.extend_from_slice(available);
176 self.reader.consume(len);
177 self.check_physical_line_len(buf.len())?;
178 }
179 }
180
181 fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
182 let hard_max = self.max_line_len.saturating_add(2);
183 if len > hard_max {
184 return Err(RowError::LineTooLarge {
185 line_no: self.line_no + 1,
186 len,
187 max: self.max_line_len,
188 });
189 }
190 Ok(())
191 }
192}
193
194#[derive(Clone, Copy, Debug, Eq, PartialEq)]
195pub enum NdjsonControl {
196 Continue,
197 Stop,
198}
199
200pub fn for_each_ndjson<R, F>(
201 engine: &JetroEngine,
202 reader: R,
203 query: &str,
204 f: F,
205) -> Result<usize, JetroEngineError>
206where
207 R: BufRead,
208 F: FnMut(Value),
209{
210 for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
211}
212
213pub fn for_each_ndjson_with_options<R, F>(
214 engine: &JetroEngine,
215 reader: R,
216 query: &str,
217 options: NdjsonOptions,
218 mut f: F,
219) -> Result<usize, JetroEngineError>
220where
221 R: BufRead,
222 F: FnMut(Value),
223{
224 drive_ndjson(engine, reader, query, options, |value| {
225 f(value);
226 Ok(NdjsonControl::Continue)
227 })
228}
229
230pub fn for_each_ndjson_until<R, F>(
231 engine: &JetroEngine,
232 reader: R,
233 query: &str,
234 f: F,
235) -> Result<usize, JetroEngineError>
236where
237 R: BufRead,
238 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
239{
240 for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
241}
242
243pub fn for_each_ndjson_until_with_options<R, F>(
244 engine: &JetroEngine,
245 reader: R,
246 query: &str,
247 options: NdjsonOptions,
248 f: F,
249) -> Result<usize, JetroEngineError>
250where
251 R: BufRead,
252 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
253{
254 drive_ndjson(engine, reader, query, options, f)
255}
256
257pub fn for_each_ndjson_source<F>(
258 engine: &JetroEngine,
259 source: NdjsonSource,
260 query: &str,
261 f: F,
262) -> Result<usize, JetroEngineError>
263where
264 F: FnMut(Value),
265{
266 for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
267}
268
269pub fn for_each_ndjson_source_with_options<F>(
270 engine: &JetroEngine,
271 source: NdjsonSource,
272 query: &str,
273 options: NdjsonOptions,
274 f: F,
275) -> Result<usize, JetroEngineError>
276where
277 F: FnMut(Value),
278{
279 match source {
280 NdjsonSource::File(path) => {
281 let file = File::open(path)?;
282 for_each_ndjson_with_options(
283 engine,
284 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
285 query,
286 options,
287 f,
288 )
289 }
290 NdjsonSource::Reader(reader) => {
291 for_each_ndjson_with_options(engine, reader, query, options, f)
292 }
293 }
294}
295
296pub fn for_each_ndjson_source_until<F>(
297 engine: &JetroEngine,
298 source: NdjsonSource,
299 query: &str,
300 f: F,
301) -> Result<usize, JetroEngineError>
302where
303 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
304{
305 for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
306}
307
308pub fn for_each_ndjson_source_until_with_options<F>(
309 engine: &JetroEngine,
310 source: NdjsonSource,
311 query: &str,
312 options: NdjsonOptions,
313 f: F,
314) -> Result<usize, JetroEngineError>
315where
316 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
317{
318 match source {
319 NdjsonSource::File(path) => {
320 let file = File::open(path)?;
321 for_each_ndjson_until_with_options(
322 engine,
323 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
324 query,
325 options,
326 f,
327 )
328 }
329 NdjsonSource::Reader(reader) => {
330 for_each_ndjson_until_with_options(engine, reader, query, options, f)
331 }
332 }
333}
334
335pub fn collect_ndjson<R>(
336 engine: &JetroEngine,
337 reader: R,
338 query: &str,
339) -> Result<Vec<Value>, JetroEngineError>
340where
341 R: BufRead,
342{
343 collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
344}
345
346pub fn collect_ndjson_with_options<R>(
347 engine: &JetroEngine,
348 reader: R,
349 query: &str,
350 options: NdjsonOptions,
351) -> Result<Vec<Value>, JetroEngineError>
352where
353 R: BufRead,
354{
355 let mut values = Vec::new();
356 for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
357 Ok(values)
358}
359
360pub fn collect_ndjson_file<P>(
361 engine: &JetroEngine,
362 path: P,
363 query: &str,
364) -> Result<Vec<Value>, JetroEngineError>
365where
366 P: AsRef<Path>,
367{
368 let file = File::open(path)?;
369 let options = NdjsonOptions::default();
370 collect_ndjson_with_options(
371 engine,
372 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
373 query,
374 options,
375 )
376}
377
378pub fn collect_ndjson_file_with_options<P>(
379 engine: &JetroEngine,
380 path: P,
381 query: &str,
382 options: NdjsonOptions,
383) -> Result<Vec<Value>, JetroEngineError>
384where
385 P: AsRef<Path>,
386{
387 let file = File::open(path)?;
388 collect_ndjson_with_options(
389 engine,
390 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
391 query,
392 options,
393 )
394}
395
396pub fn collect_ndjson_source(
397 engine: &JetroEngine,
398 source: NdjsonSource,
399 query: &str,
400) -> Result<Vec<Value>, JetroEngineError> {
401 collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
402}
403
404pub fn collect_ndjson_source_with_options(
405 engine: &JetroEngine,
406 source: NdjsonSource,
407 query: &str,
408 options: NdjsonOptions,
409) -> Result<Vec<Value>, JetroEngineError> {
410 match source {
411 NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
412 NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
413 }
414}
415
416pub fn collect_ndjson_matches<R>(
417 engine: &JetroEngine,
418 reader: R,
419 predicate: &str,
420 limit: usize,
421) -> Result<Vec<Value>, JetroEngineError>
422where
423 R: BufRead,
424{
425 collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
426}
427
428pub fn collect_ndjson_matches_with_options<R>(
429 engine: &JetroEngine,
430 reader: R,
431 predicate: &str,
432 limit: usize,
433 options: NdjsonOptions,
434) -> Result<Vec<Value>, JetroEngineError>
435where
436 R: BufRead,
437{
438 let mut values = Vec::with_capacity(limit);
439 drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
440 values.push(Value::from(value));
441 Ok(NdjsonControl::Continue)
442 })?;
443 Ok(values)
444}
445
446pub fn collect_ndjson_matches_file<P>(
447 engine: &JetroEngine,
448 path: P,
449 predicate: &str,
450 limit: usize,
451) -> Result<Vec<Value>, JetroEngineError>
452where
453 P: AsRef<Path>,
454{
455 let file = File::open(path)?;
456 let options = NdjsonOptions::default();
457 collect_ndjson_matches_with_options(
458 engine,
459 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
460 predicate,
461 limit,
462 options,
463 )
464}
465
466pub fn collect_ndjson_matches_file_with_options<P>(
467 engine: &JetroEngine,
468 path: P,
469 predicate: &str,
470 limit: usize,
471 options: NdjsonOptions,
472) -> Result<Vec<Value>, JetroEngineError>
473where
474 P: AsRef<Path>,
475{
476 let file = File::open(path)?;
477 collect_ndjson_matches_with_options(
478 engine,
479 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
480 predicate,
481 limit,
482 options,
483 )
484}
485
486pub fn collect_ndjson_matches_source(
487 engine: &JetroEngine,
488 source: NdjsonSource,
489 predicate: &str,
490 limit: usize,
491) -> Result<Vec<Value>, JetroEngineError> {
492 collect_ndjson_matches_source_with_options(
493 engine,
494 source,
495 predicate,
496 limit,
497 NdjsonOptions::default(),
498 )
499}
500
501pub fn collect_ndjson_matches_source_with_options(
502 engine: &JetroEngine,
503 source: NdjsonSource,
504 predicate: &str,
505 limit: usize,
506 options: NdjsonOptions,
507) -> Result<Vec<Value>, JetroEngineError> {
508 match source {
509 NdjsonSource::File(path) => {
510 collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
511 }
512 NdjsonSource::Reader(reader) => {
513 collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
514 }
515 }
516}
517
518pub fn run_ndjson<R, W>(
519 engine: &JetroEngine,
520 reader: R,
521 query: &str,
522 writer: W,
523) -> Result<usize, JetroEngineError>
524where
525 R: BufRead,
526 W: Write,
527{
528 run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
529}
530
531pub fn run_ndjson_file<P, W>(
532 engine: &JetroEngine,
533 path: P,
534 query: &str,
535 writer: W,
536) -> Result<usize, JetroEngineError>
537where
538 P: AsRef<Path>,
539 W: Write,
540{
541 let file = File::open(path)?;
542 let options = NdjsonOptions::default();
543 run_ndjson_with_options(
544 engine,
545 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
546 query,
547 writer,
548 options,
549 )
550}
551
552pub fn run_ndjson_file_with_options<P, W>(
553 engine: &JetroEngine,
554 path: P,
555 query: &str,
556 writer: W,
557 options: NdjsonOptions,
558) -> Result<usize, JetroEngineError>
559where
560 P: AsRef<Path>,
561 W: Write,
562{
563 let file = File::open(path)?;
564 run_ndjson_with_options(
565 engine,
566 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
567 query,
568 writer,
569 options,
570 )
571}
572
573pub fn run_ndjson_with_options<R, W>(
574 engine: &JetroEngine,
575 reader: R,
576 query: &str,
577 writer: W,
578 options: NdjsonOptions,
579) -> Result<usize, JetroEngineError>
580where
581 R: BufRead,
582 W: Write,
583{
584 let mut writer = ndjson_writer_with_options(writer, options);
585 let count = drive_ndjson_val(engine, reader, query, options, |value| {
586 write_val_line(&mut writer, &value)?;
587 Ok(NdjsonControl::Continue)
588 })?;
589 writer.flush()?;
590 Ok(count)
591}
592
593pub fn run_ndjson_limit<R, W>(
594 engine: &JetroEngine,
595 reader: R,
596 query: &str,
597 limit: usize,
598 writer: W,
599) -> Result<usize, JetroEngineError>
600where
601 R: BufRead,
602 W: Write,
603{
604 run_ndjson_limit_with_options(
605 engine,
606 reader,
607 query,
608 limit,
609 writer,
610 NdjsonOptions::default(),
611 )
612}
613
614pub fn run_ndjson_limit_with_options<R, W>(
615 engine: &JetroEngine,
616 reader: R,
617 query: &str,
618 limit: usize,
619 writer: W,
620 options: NdjsonOptions,
621) -> Result<usize, JetroEngineError>
622where
623 R: BufRead,
624 W: Write,
625{
626 if limit == 0 {
627 return Ok(0);
628 }
629
630 let mut writer = ndjson_writer_with_options(writer, options);
631 let mut emitted = 0usize;
632 let count = drive_ndjson_val(engine, reader, query, options, |value| {
633 write_val_line(&mut writer, &value)?;
634 emitted += 1;
635 Ok(if emitted >= limit {
636 NdjsonControl::Stop
637 } else {
638 NdjsonControl::Continue
639 })
640 })?;
641 writer.flush()?;
642 Ok(count)
643}
644
645pub fn run_ndjson_file_limit<P, W>(
646 engine: &JetroEngine,
647 path: P,
648 query: &str,
649 limit: usize,
650 writer: W,
651) -> Result<usize, JetroEngineError>
652where
653 P: AsRef<Path>,
654 W: Write,
655{
656 let file = File::open(path)?;
657 let options = NdjsonOptions::default();
658 run_ndjson_limit_with_options(
659 engine,
660 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
661 query,
662 limit,
663 writer,
664 options,
665 )
666}
667
668pub fn run_ndjson_file_limit_with_options<P, W>(
669 engine: &JetroEngine,
670 path: P,
671 query: &str,
672 limit: usize,
673 writer: W,
674 options: NdjsonOptions,
675) -> Result<usize, JetroEngineError>
676where
677 P: AsRef<Path>,
678 W: Write,
679{
680 let file = File::open(path)?;
681 run_ndjson_limit_with_options(
682 engine,
683 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
684 query,
685 limit,
686 writer,
687 options,
688 )
689}
690
691pub fn run_ndjson_source<W>(
692 engine: &JetroEngine,
693 source: NdjsonSource,
694 query: &str,
695 writer: W,
696) -> Result<usize, JetroEngineError>
697where
698 W: Write,
699{
700 run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
701}
702
703pub fn run_ndjson_source_with_options<W>(
704 engine: &JetroEngine,
705 source: NdjsonSource,
706 query: &str,
707 writer: W,
708 options: NdjsonOptions,
709) -> Result<usize, JetroEngineError>
710where
711 W: Write,
712{
713 match source {
714 NdjsonSource::File(path) => {
715 run_ndjson_file_with_options(engine, path, query, writer, options)
716 }
717 NdjsonSource::Reader(reader) => {
718 run_ndjson_with_options(engine, reader, query, writer, options)
719 }
720 }
721}
722
723pub fn run_ndjson_source_limit<W>(
724 engine: &JetroEngine,
725 source: NdjsonSource,
726 query: &str,
727 limit: usize,
728 writer: W,
729) -> Result<usize, JetroEngineError>
730where
731 W: Write,
732{
733 run_ndjson_source_limit_with_options(
734 engine,
735 source,
736 query,
737 limit,
738 writer,
739 NdjsonOptions::default(),
740 )
741}
742
743pub fn run_ndjson_source_limit_with_options<W>(
744 engine: &JetroEngine,
745 source: NdjsonSource,
746 query: &str,
747 limit: usize,
748 writer: W,
749 options: NdjsonOptions,
750) -> Result<usize, JetroEngineError>
751where
752 W: Write,
753{
754 match source {
755 NdjsonSource::File(path) => {
756 run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
757 }
758 NdjsonSource::Reader(reader) => {
759 run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
760 }
761 }
762}
763
764pub fn run_ndjson_matches<R, W>(
765 engine: &JetroEngine,
766 reader: R,
767 predicate: &str,
768 limit: usize,
769 writer: W,
770) -> Result<usize, JetroEngineError>
771where
772 R: BufRead,
773 W: Write,
774{
775 run_ndjson_matches_with_options(
776 engine,
777 reader,
778 predicate,
779 limit,
780 writer,
781 NdjsonOptions::default(),
782 )
783}
784
785pub fn run_ndjson_matches_with_options<R, W>(
786 engine: &JetroEngine,
787 reader: R,
788 predicate: &str,
789 limit: usize,
790 writer: W,
791 options: NdjsonOptions,
792) -> Result<usize, JetroEngineError>
793where
794 R: BufRead,
795 W: Write,
796{
797 drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
798}
799
800pub fn run_ndjson_matches_file<P, W>(
801 engine: &JetroEngine,
802 path: P,
803 predicate: &str,
804 limit: usize,
805 writer: W,
806) -> Result<usize, JetroEngineError>
807where
808 P: AsRef<Path>,
809 W: Write,
810{
811 let file = File::open(path)?;
812 let options = NdjsonOptions::default();
813 run_ndjson_matches_with_options(
814 engine,
815 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
816 predicate,
817 limit,
818 writer,
819 options,
820 )
821}
822
823pub fn run_ndjson_matches_file_with_options<P, W>(
824 engine: &JetroEngine,
825 path: P,
826 predicate: &str,
827 limit: usize,
828 writer: W,
829 options: NdjsonOptions,
830) -> Result<usize, JetroEngineError>
831where
832 P: AsRef<Path>,
833 W: Write,
834{
835 let file = File::open(path)?;
836 run_ndjson_matches_with_options(
837 engine,
838 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
839 predicate,
840 limit,
841 writer,
842 options,
843 )
844}
845
846pub fn run_ndjson_matches_source<W>(
847 engine: &JetroEngine,
848 source: NdjsonSource,
849 predicate: &str,
850 limit: usize,
851 writer: W,
852) -> Result<usize, JetroEngineError>
853where
854 W: Write,
855{
856 run_ndjson_matches_source_with_options(
857 engine,
858 source,
859 predicate,
860 limit,
861 writer,
862 NdjsonOptions::default(),
863 )
864}
865
866pub fn run_ndjson_matches_source_with_options<W>(
867 engine: &JetroEngine,
868 source: NdjsonSource,
869 predicate: &str,
870 limit: usize,
871 writer: W,
872 options: NdjsonOptions,
873) -> Result<usize, JetroEngineError>
874where
875 W: Write,
876{
877 match source {
878 NdjsonSource::File(path) => {
879 run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
880 }
881 NdjsonSource::Reader(reader) => {
882 run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
883 }
884 }
885}
886
887fn drive_ndjson<R, F>(
888 engine: &JetroEngine,
889 reader: R,
890 query: &str,
891 options: NdjsonOptions,
892 mut emit: F,
893) -> Result<usize, JetroEngineError>
894where
895 R: BufRead,
896 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
897{
898 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
899 let plan = engine.cached_plan(query, PlanningContext::bytes());
900 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
901 let mut count = 0;
902
903 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
904 let document = parse_row(engine, line_no, row)?;
905 let out = collect_row_val(engine, &document, &plan, line_no)?;
906 count += 1;
907 if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
908 break;
909 }
910 }
911
912 Ok(count)
913}
914
915fn drive_ndjson_val<R, F>(
916 engine: &JetroEngine,
917 reader: R,
918 query: &str,
919 options: NdjsonOptions,
920 mut emit: F,
921) -> Result<usize, JetroEngineError>
922where
923 R: BufRead,
924 F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
925{
926 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
927 let mut executor = NdjsonRowExecutor::new(engine, query);
928 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
929 let mut count = 0;
930
931 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
932 count += 1;
933 if matches!(emit(executor.eval_owned_row(line_no, row)?)?, NdjsonControl::Stop) {
934 break;
935 }
936 }
937
938 Ok(count)
939}
940
941fn drive_ndjson_matches<R, F>(
942 engine: &JetroEngine,
943 reader: R,
944 predicate: &str,
945 limit: usize,
946 options: NdjsonOptions,
947 mut emit: F,
948) -> Result<usize, JetroEngineError>
949where
950 R: BufRead,
951 F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
952{
953 if limit == 0 {
954 return Ok(0);
955 }
956
957 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
958 let mut executor = NdjsonRowExecutor::new(engine, predicate);
959 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
960 let mut emitted = 0usize;
961
962 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
963 let document = executor.parse_owned_row(line_no, row)?;
964 let matched = executor.eval_document(line_no, &document)?;
965 if !is_truthy(&matched) {
966 continue;
967 }
968
969 let root = document
970 .root_val_with(engine.keys())
971 .map_err(|err| row_eval_error(line_no, err))?;
972 emitted += 1;
973 if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
974 break;
975 }
976 }
977
978 Ok(emitted)
979}
980
981fn drive_ndjson_matches_writer<R, W>(
982 engine: &JetroEngine,
983 reader: R,
984 predicate: &str,
985 limit: usize,
986 options: NdjsonOptions,
987 writer: W,
988) -> Result<usize, JetroEngineError>
989where
990 R: BufRead,
991 W: Write,
992{
993 if limit == 0 {
994 return Ok(0);
995 }
996
997 let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
998 let mut executor = NdjsonRowExecutor::new(engine, predicate);
999 let mut writer = ndjson_writer_with_options(writer, options);
1000 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1001 let mut emitted = 0usize;
1002
1003 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1004 let document = executor.parse_owned_row(line_no, row)?;
1005 let matched = executor.eval_document(line_no, &document)?;
1006 if !is_truthy(&matched) {
1007 continue;
1008 }
1009
1010 write_document_line(&mut writer, &document, line_no, executor.engine())?;
1011 emitted += 1;
1012 if emitted >= limit {
1013 break;
1014 }
1015 }
1016
1017 writer.flush()?;
1018 Ok(emitted)
1019}
1020
1021pub(super) struct NdjsonRowExecutor<'a> {
1022 engine: &'a JetroEngine,
1023 plan: crate::ir::physical::QueryPlan,
1024 vm: MutexGuard<'a, VM>,
1025}
1026
1027impl<'a> NdjsonRowExecutor<'a> {
1028 pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
1029 Self {
1030 engine,
1031 plan: engine.cached_plan(query, PlanningContext::bytes()),
1032 vm: engine.lock_vm(),
1033 }
1034 }
1035
1036 pub(super) fn eval_owned_row(
1037 &mut self,
1038 line_no: u64,
1039 row: Vec<u8>,
1040 ) -> Result<Val, JetroEngineError> {
1041 let document = self.parse_owned_row(line_no, row)?;
1042 self.eval_document(line_no, &document)
1043 }
1044
1045 pub(super) fn parse_owned_row(
1046 &self,
1047 line_no: u64,
1048 row: Vec<u8>,
1049 ) -> Result<Jetro, JetroEngineError> {
1050 parse_row(self.engine, line_no, row)
1051 }
1052
1053 pub(super) fn eval_document(
1054 &mut self,
1055 line_no: u64,
1056 document: &Jetro,
1057 ) -> Result<Val, JetroEngineError> {
1058 crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
1059 .map_err(|err| row_eval_error(line_no, err))
1060 }
1061
1062 pub(super) fn engine(&self) -> &'a JetroEngine {
1063 self.engine
1064 }
1065}
1066
1067pub(super) fn write_val_line<W: Write>(writer: &mut W, value: &Val) -> Result<(), JetroEngineError> {
1068 write_val_json(writer, value)?;
1069 writer.write_all(b"\n")?;
1070 Ok(())
1071}
1072
1073pub(super) fn write_document_line<W: Write>(
1074 writer: &mut W,
1075 document: &Jetro,
1076 line_no: u64,
1077 engine: &JetroEngine,
1078) -> Result<(), JetroEngineError> {
1079 if let Some(bytes) = document.raw_bytes() {
1080 writer.write_all(bytes)?;
1081 writer.write_all(b"\n")?;
1082 return Ok(());
1083 }
1084
1085 let root = document
1086 .root_val_with(engine.keys())
1087 .map_err(|err| row_eval_error(line_no, err))?;
1088 write_val_line(writer, &root)
1089}
1090
1091pub(super) fn ndjson_writer_with_options<W: Write>(
1092 writer: W,
1093 options: NdjsonOptions,
1094) -> BufWriter<W> {
1095 let capacity = options
1096 .reader_buffer_capacity
1097 .max(DEFAULT_READER_BUFFER_CAPACITY);
1098 BufWriter::with_capacity(capacity, writer)
1099}
1100
1101fn write_val_json<W: Write>(writer: &mut W, value: &Val) -> Result<(), JetroEngineError> {
1102 match value {
1103 Val::Null => writer.write_all(b"null")?,
1104 Val::Bool(true) => writer.write_all(b"true")?,
1105 Val::Bool(false) => writer.write_all(b"false")?,
1106 Val::Int(n) => write_i64(writer, *n)?,
1107 Val::Float(n) => write_f64(writer, *n)?,
1108 Val::Str(s) => write_json_str(writer, s.as_ref())?,
1109 Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
1110 Val::Arr(items) => write_json_array(writer, items.iter())?,
1111 Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
1112 Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
1113 Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
1114 Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
1115 Val::Obj(entries) => {
1116 write_json_object(writer, entries.iter().map(|(key, value)| (key.as_ref(), value)))?
1117 }
1118 Val::ObjSmall(entries) => {
1119 write_json_object(writer, entries.iter().map(|(key, value)| (key.as_ref(), value)))?
1120 }
1121 Val::ObjVec(data) => write_json_objvec(writer, data)?,
1122 }
1123 Ok(())
1124}
1125
1126fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1127where
1128 W: Write,
1129 I: IntoIterator<Item = &'a Val>,
1130{
1131 writer.write_all(b"[")?;
1132 let mut first = true;
1133 for item in items {
1134 if first {
1135 first = false;
1136 } else {
1137 writer.write_all(b",")?;
1138 }
1139 write_val_json(writer, item)?;
1140 }
1141 writer.write_all(b"]")?;
1142 Ok(())
1143}
1144
1145fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1146where
1147 W: Write,
1148 I: IntoIterator<Item = i64>,
1149{
1150 writer.write_all(b"[")?;
1151 let mut first = true;
1152 let mut buf = itoa::Buffer::new();
1153 for item in items {
1154 if first {
1155 first = false;
1156 } else {
1157 writer.write_all(b",")?;
1158 }
1159 writer.write_all(buf.format(item).as_bytes())?;
1160 }
1161 writer.write_all(b"]")?;
1162 Ok(())
1163}
1164
1165fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1166where
1167 W: Write,
1168 I: IntoIterator<Item = f64>,
1169{
1170 writer.write_all(b"[")?;
1171 let mut first = true;
1172 let mut buf = ryu::Buffer::new();
1173 for item in items {
1174 if first {
1175 first = false;
1176 } else {
1177 writer.write_all(b",")?;
1178 }
1179 if item.is_finite() {
1180 writer.write_all(buf.format(item).as_bytes())?;
1181 } else {
1182 writer.write_all(b"0")?;
1183 }
1184 }
1185 writer.write_all(b"]")?;
1186 Ok(())
1187}
1188
1189fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1190where
1191 W: Write,
1192 I: IntoIterator<Item = &'a str>,
1193{
1194 writer.write_all(b"[")?;
1195 let mut first = true;
1196 for item in items {
1197 if first {
1198 first = false;
1199 } else {
1200 writer.write_all(b",")?;
1201 }
1202 write_json_str(writer, item)?;
1203 }
1204 writer.write_all(b"]")?;
1205 Ok(())
1206}
1207
1208fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
1209where
1210 W: Write,
1211 I: IntoIterator<Item = (&'a str, &'a Val)>,
1212{
1213 writer.write_all(b"{")?;
1214 let mut first = true;
1215 for (key, value) in entries {
1216 if first {
1217 first = false;
1218 } else {
1219 writer.write_all(b",")?;
1220 }
1221 write_json_str(writer, key)?;
1222 writer.write_all(b":")?;
1223 write_val_json(writer, value)?;
1224 }
1225 writer.write_all(b"}")?;
1226 Ok(())
1227}
1228
1229fn write_json_objvec<W: Write>(
1230 writer: &mut W,
1231 data: &crate::data::value::ObjVecData,
1232) -> Result<(), JetroEngineError> {
1233 writer.write_all(b"[")?;
1234 for row in 0..data.nrows() {
1235 if row > 0 {
1236 writer.write_all(b",")?;
1237 }
1238 writer.write_all(b"{")?;
1239 for slot in 0..data.stride() {
1240 if slot > 0 {
1241 writer.write_all(b",")?;
1242 }
1243 write_json_str(writer, data.keys[slot].as_ref())?;
1244 writer.write_all(b":")?;
1245 write_val_json(writer, data.cell(row, slot))?;
1246 }
1247 writer.write_all(b"}")?;
1248 }
1249 writer.write_all(b"]")?;
1250 Ok(())
1251}
1252
1253fn write_json_str<W: Write>(writer: &mut W, value: &str) -> Result<(), JetroEngineError> {
1254 writer.write_all(b"\"")?;
1255 let bytes = value.as_bytes();
1256 if !needs_json_escape(bytes) {
1257 writer.write_all(bytes)?;
1258 writer.write_all(b"\"")?;
1259 return Ok(());
1260 }
1261
1262 let mut start = 0usize;
1263
1264 for (idx, &byte) in bytes.iter().enumerate() {
1265 let escaped = match byte {
1266 b'"' => Some(br#"\""#.as_slice()),
1267 b'\\' => Some(br#"\\"#.as_slice()),
1268 b'\n' => Some(br#"\n"#.as_slice()),
1269 b'\r' => Some(br#"\r"#.as_slice()),
1270 b'\t' => Some(br#"\t"#.as_slice()),
1271 0x08 => Some(br#"\b"#.as_slice()),
1272 0x0c => Some(br#"\f"#.as_slice()),
1273 0x00..=0x1f => None,
1274 _ => continue,
1275 };
1276
1277 if start < idx {
1278 writer.write_all(&bytes[start..idx])?;
1279 }
1280 match escaped {
1281 Some(seq) => writer.write_all(seq)?,
1282 None => write_control_escape(writer, byte)?,
1283 }
1284 start = idx + 1;
1285 }
1286
1287 if start < bytes.len() {
1288 writer.write_all(&bytes[start..])?;
1289 }
1290 writer.write_all(b"\"")?;
1291 Ok(())
1292}
1293
1294#[inline]
1295fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
1296 let mut buf = itoa::Buffer::new();
1297 writer.write_all(buf.format(value).as_bytes())?;
1298 Ok(())
1299}
1300
1301#[inline]
1302fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
1303 if value.is_finite() {
1304 let mut buf = ryu::Buffer::new();
1305 writer.write_all(buf.format(value).as_bytes())?;
1306 } else {
1307 writer.write_all(b"0")?;
1308 }
1309 Ok(())
1310}
1311
1312#[inline]
1313fn needs_json_escape(bytes: &[u8]) -> bool {
1314 bytes
1315 .iter()
1316 .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
1317}
1318
1319fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
1320 const HEX: &[u8; 16] = b"0123456789abcdef";
1321 writer.write_all(&[
1322 b'\\',
1323 b'u',
1324 b'0',
1325 b'0',
1326 HEX[(byte >> 4) as usize],
1327 HEX[(byte & 0x0f) as usize],
1328 ])?;
1329 Ok(())
1330}
1331
1332pub(super) fn collect_row_val(
1333 engine: &JetroEngine,
1334 document: &Jetro,
1335 plan: &crate::ir::physical::QueryPlan,
1336 line_no: u64,
1337) -> Result<Val, JetroEngineError> {
1338 engine
1339 .collect_prepared_val(document, plan)
1340 .map_err(|err| row_eval_error(line_no, err))
1341}
1342
1343pub(super) fn parse_row(
1344 engine: &JetroEngine,
1345 line_no: u64,
1346 row: Vec<u8>,
1347) -> Result<Jetro, JetroEngineError> {
1348 engine
1349 .parse_bytes_lazy(row)
1350 .map_err(|err| row_parse_error(line_no, err))
1351}
1352
1353fn row_parse_error(line_no: u64, err: JetroEngineError) -> JetroEngineError {
1354 match err {
1355 JetroEngineError::Json(source) => RowError::InvalidJson { line_no, source }.into(),
1356 JetroEngineError::Eval(eval) => RowError::InvalidJsonMessage {
1357 line_no,
1358 message: eval.to_string(),
1359 }
1360 .into(),
1361 other => other,
1362 }
1363}
1364
1365pub(super) fn row_eval_error(line_no: u64, err: crate::EvalError) -> JetroEngineError {
1366 let message = err.0;
1367 if message.starts_with("Invalid JSON:") {
1368 RowError::InvalidJsonMessage { line_no, message }.into()
1369 } else {
1370 crate::EvalError(message).into()
1371 }
1372}
1373
1374fn trim_line_ending(buf: &mut Vec<u8>) {
1375 while matches!(buf.last(), Some(b'\n' | b'\r')) {
1376 buf.pop();
1377 }
1378}
1379
1380fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
1381 if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
1382 buf.drain(..3);
1383 }
1384}
1385
1386fn non_ws_range(buf: &[u8]) -> (usize, usize) {
1387 let start = buf
1388 .iter()
1389 .position(|b| !b.is_ascii_whitespace())
1390 .unwrap_or(buf.len());
1391 let end = buf
1392 .iter()
1393 .rposition(|b| !b.is_ascii_whitespace())
1394 .map(|idx| idx + 1)
1395 .unwrap_or(start);
1396 (start, end)
1397}
1398
1399#[cfg(test)]
1400mod tests {
1401 #[test]
1402 #[cfg(feature = "simd-json")]
1403 fn parse_row_keeps_simd_document_lazy() {
1404 let engine = crate::JetroEngine::new();
1405 let row = br#"{"name":"Ada","age":30}"#.to_vec();
1406
1407 let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
1408
1409 assert!(!document.root_val_is_materialized());
1410 assert!(!document.tape_is_built());
1411 }
1412
1413 #[test]
1414 fn owned_row_read_preserves_reusable_buffer_capacity() {
1415 let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
1416 let mut driver = super::NdjsonPerRowDriver::new(input);
1417 let mut buf = Vec::with_capacity(128);
1418
1419 let first = driver
1420 .read_next_owned(&mut buf)
1421 .expect("row read succeeds")
1422 .expect("first row exists");
1423 assert_eq!(first.1, br#"{"n":1}"#);
1424 assert_eq!(buf.capacity(), 128);
1425
1426 let second = driver
1427 .read_next_owned(&mut buf)
1428 .expect("row read succeeds")
1429 .expect("second row exists");
1430 assert_eq!(second.1, br#"{"n":2}"#);
1431 assert_eq!(buf.capacity(), 128);
1432 }
1433}