Skip to main content

jetro_core/io/
ndjson.rs

1use super::{NdjsonSource, RowError};
2use crate::data::value::Val;
3use crate::plan::physical::PlanningContext;
4use crate::util::is_truthy;
5use crate::{Jetro, JetroEngine, JetroEngineError, VM};
6use memchr::memchr;
7use serde_json::Value;
8use std::fs::File;
9use std::io::{BufRead, BufWriter, Write};
10use std::path::Path;
11use std::sync::MutexGuard;
12
13const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
14const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
15const DEFAULT_READER_BUFFER_CAPACITY: usize = 64 * 1024;
16pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
17
18/// Configuration for per-row NDJSON execution.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub struct NdjsonOptions {
21    pub max_line_len: usize,
22    pub initial_buffer_capacity: usize,
23    pub reader_buffer_capacity: usize,
24    pub reverse_chunk_size: usize,
25}
26
27impl Default for NdjsonOptions {
28    fn default() -> Self {
29        Self {
30            max_line_len: DEFAULT_MAX_LINE_LEN,
31            initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
32            reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
33            reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
34        }
35    }
36}
37
38impl NdjsonOptions {
39    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
40        self.max_line_len = max_line_len;
41        self
42    }
43
44    pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
45        self.initial_buffer_capacity = capacity;
46        self
47    }
48
49    pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
50        self.reader_buffer_capacity = capacity;
51        self
52    }
53
54    pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
55        self.reverse_chunk_size = capacity;
56        self
57    }
58}
59
60/// Forward-only per-row NDJSON reader.
61pub struct NdjsonPerRowDriver<R> {
62    reader: R,
63    line_no: u64,
64    max_line_len: usize,
65}
66
67impl<R: BufRead> NdjsonPerRowDriver<R> {
68    pub fn new(reader: R) -> Self {
69        Self {
70            reader,
71            line_no: 0,
72            max_line_len: DEFAULT_MAX_LINE_LEN,
73        }
74    }
75
76    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
77        self.max_line_len = max_line_len;
78        self
79    }
80
81    pub fn line_no(&self) -> u64 {
82        self.line_no
83    }
84
85    /// Read the next non-empty NDJSON row into `buf`, returning its 1-based line
86    /// number. Empty and whitespace-only rows are skipped.
87    pub fn read_next_nonempty<'a>(
88        &mut self,
89        buf: &'a mut Vec<u8>,
90    ) -> Result<Option<(u64, &'a [u8])>, RowError> {
91        loop {
92            buf.clear();
93            let read = self.read_physical_line(buf)?;
94            if read == 0 {
95                return Ok(None);
96            }
97            self.line_no += 1;
98
99            strip_initial_bom(self.line_no, buf);
100            trim_line_ending(buf);
101
102            let (start, end) = non_ws_range(buf);
103            if start == end {
104                continue;
105            }
106
107            let len = end - start;
108            if len > self.max_line_len {
109                return Err(RowError::LineTooLarge {
110                    line_no: self.line_no,
111                    len,
112                    max: self.max_line_len,
113                });
114            }
115
116            return Ok(Some((self.line_no, &buf[start..end])));
117        }
118    }
119
120    /// Read the next non-empty row and transfer ownership of `buf` to the
121    /// caller. This is the hot path used by `JetroEngine` NDJSON execution so
122    /// the row can be parsed without an extra bytes copy.
123    pub fn read_next_owned(
124        &mut self,
125        buf: &mut Vec<u8>,
126    ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
127        loop {
128            buf.clear();
129            let read = self.read_physical_line(buf)?;
130            if read == 0 {
131                return Ok(None);
132            }
133            self.line_no += 1;
134
135            strip_initial_bom(self.line_no, buf);
136            trim_line_ending(buf);
137
138            let (start, end) = non_ws_range(buf);
139            if start == end {
140                continue;
141            }
142
143            let len = end - start;
144            if len > self.max_line_len {
145                return Err(RowError::LineTooLarge {
146                    line_no: self.line_no,
147                    len,
148                    max: self.max_line_len,
149                });
150            }
151
152            let capacity = buf.capacity();
153            return Ok(Some((
154                self.line_no,
155                std::mem::replace(buf, Vec::with_capacity(capacity)),
156            )));
157        }
158    }
159
160    fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
161        loop {
162            let available = self.reader.fill_buf()?;
163            if available.is_empty() {
164                return Ok(buf.len());
165            }
166
167            if let Some(pos) = memchr(b'\n', available) {
168                buf.extend_from_slice(&available[..=pos]);
169                self.reader.consume(pos + 1);
170                self.check_physical_line_len(buf.len())?;
171                return Ok(buf.len());
172            }
173
174            let len = available.len();
175            buf.extend_from_slice(available);
176            self.reader.consume(len);
177            self.check_physical_line_len(buf.len())?;
178        }
179    }
180
181    fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
182        let hard_max = self.max_line_len.saturating_add(2);
183        if len > hard_max {
184            return Err(RowError::LineTooLarge {
185                line_no: self.line_no + 1,
186                len,
187                max: self.max_line_len,
188            });
189        }
190        Ok(())
191    }
192}
193
194#[derive(Clone, Copy, Debug, Eq, PartialEq)]
195pub enum NdjsonControl {
196    Continue,
197    Stop,
198}
199
200pub fn for_each_ndjson<R, F>(
201    engine: &JetroEngine,
202    reader: R,
203    query: &str,
204    f: F,
205) -> Result<usize, JetroEngineError>
206where
207    R: BufRead,
208    F: FnMut(Value),
209{
210    for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
211}
212
213pub fn for_each_ndjson_with_options<R, F>(
214    engine: &JetroEngine,
215    reader: R,
216    query: &str,
217    options: NdjsonOptions,
218    mut f: F,
219) -> Result<usize, JetroEngineError>
220where
221    R: BufRead,
222    F: FnMut(Value),
223{
224    drive_ndjson(engine, reader, query, options, |value| {
225        f(value);
226        Ok(NdjsonControl::Continue)
227    })
228}
229
230pub fn for_each_ndjson_until<R, F>(
231    engine: &JetroEngine,
232    reader: R,
233    query: &str,
234    f: F,
235) -> Result<usize, JetroEngineError>
236where
237    R: BufRead,
238    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
239{
240    for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
241}
242
243pub fn for_each_ndjson_until_with_options<R, F>(
244    engine: &JetroEngine,
245    reader: R,
246    query: &str,
247    options: NdjsonOptions,
248    f: F,
249) -> Result<usize, JetroEngineError>
250where
251    R: BufRead,
252    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
253{
254    drive_ndjson(engine, reader, query, options, f)
255}
256
257pub fn for_each_ndjson_source<F>(
258    engine: &JetroEngine,
259    source: NdjsonSource,
260    query: &str,
261    f: F,
262) -> Result<usize, JetroEngineError>
263where
264    F: FnMut(Value),
265{
266    for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
267}
268
269pub fn for_each_ndjson_source_with_options<F>(
270    engine: &JetroEngine,
271    source: NdjsonSource,
272    query: &str,
273    options: NdjsonOptions,
274    f: F,
275) -> Result<usize, JetroEngineError>
276where
277    F: FnMut(Value),
278{
279    match source {
280        NdjsonSource::File(path) => {
281            let file = File::open(path)?;
282            for_each_ndjson_with_options(
283                engine,
284                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
285                query,
286                options,
287                f,
288            )
289        }
290        NdjsonSource::Reader(reader) => {
291            for_each_ndjson_with_options(engine, reader, query, options, f)
292        }
293    }
294}
295
296pub fn for_each_ndjson_source_until<F>(
297    engine: &JetroEngine,
298    source: NdjsonSource,
299    query: &str,
300    f: F,
301) -> Result<usize, JetroEngineError>
302where
303    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
304{
305    for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
306}
307
308pub fn for_each_ndjson_source_until_with_options<F>(
309    engine: &JetroEngine,
310    source: NdjsonSource,
311    query: &str,
312    options: NdjsonOptions,
313    f: F,
314) -> Result<usize, JetroEngineError>
315where
316    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
317{
318    match source {
319        NdjsonSource::File(path) => {
320            let file = File::open(path)?;
321            for_each_ndjson_until_with_options(
322                engine,
323                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
324                query,
325                options,
326                f,
327            )
328        }
329        NdjsonSource::Reader(reader) => {
330            for_each_ndjson_until_with_options(engine, reader, query, options, f)
331        }
332    }
333}
334
335pub fn collect_ndjson<R>(
336    engine: &JetroEngine,
337    reader: R,
338    query: &str,
339) -> Result<Vec<Value>, JetroEngineError>
340where
341    R: BufRead,
342{
343    collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
344}
345
346pub fn collect_ndjson_with_options<R>(
347    engine: &JetroEngine,
348    reader: R,
349    query: &str,
350    options: NdjsonOptions,
351) -> Result<Vec<Value>, JetroEngineError>
352where
353    R: BufRead,
354{
355    let mut values = Vec::new();
356    for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
357    Ok(values)
358}
359
360pub fn collect_ndjson_file<P>(
361    engine: &JetroEngine,
362    path: P,
363    query: &str,
364) -> Result<Vec<Value>, JetroEngineError>
365where
366    P: AsRef<Path>,
367{
368    let file = File::open(path)?;
369    let options = NdjsonOptions::default();
370    collect_ndjson_with_options(
371        engine,
372        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
373        query,
374        options,
375    )
376}
377
378pub fn collect_ndjson_file_with_options<P>(
379    engine: &JetroEngine,
380    path: P,
381    query: &str,
382    options: NdjsonOptions,
383) -> Result<Vec<Value>, JetroEngineError>
384where
385    P: AsRef<Path>,
386{
387    let file = File::open(path)?;
388    collect_ndjson_with_options(
389        engine,
390        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
391        query,
392        options,
393    )
394}
395
396pub fn collect_ndjson_source(
397    engine: &JetroEngine,
398    source: NdjsonSource,
399    query: &str,
400) -> Result<Vec<Value>, JetroEngineError> {
401    collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
402}
403
404pub fn collect_ndjson_source_with_options(
405    engine: &JetroEngine,
406    source: NdjsonSource,
407    query: &str,
408    options: NdjsonOptions,
409) -> Result<Vec<Value>, JetroEngineError> {
410    match source {
411        NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
412        NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
413    }
414}
415
416pub fn collect_ndjson_matches<R>(
417    engine: &JetroEngine,
418    reader: R,
419    predicate: &str,
420    limit: usize,
421) -> Result<Vec<Value>, JetroEngineError>
422where
423    R: BufRead,
424{
425    collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
426}
427
428pub fn collect_ndjson_matches_with_options<R>(
429    engine: &JetroEngine,
430    reader: R,
431    predicate: &str,
432    limit: usize,
433    options: NdjsonOptions,
434) -> Result<Vec<Value>, JetroEngineError>
435where
436    R: BufRead,
437{
438    let mut values = Vec::with_capacity(limit);
439    drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
440        values.push(Value::from(value));
441        Ok(NdjsonControl::Continue)
442    })?;
443    Ok(values)
444}
445
446pub fn collect_ndjson_matches_file<P>(
447    engine: &JetroEngine,
448    path: P,
449    predicate: &str,
450    limit: usize,
451) -> Result<Vec<Value>, JetroEngineError>
452where
453    P: AsRef<Path>,
454{
455    let file = File::open(path)?;
456    let options = NdjsonOptions::default();
457    collect_ndjson_matches_with_options(
458        engine,
459        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
460        predicate,
461        limit,
462        options,
463    )
464}
465
466pub fn collect_ndjson_matches_file_with_options<P>(
467    engine: &JetroEngine,
468    path: P,
469    predicate: &str,
470    limit: usize,
471    options: NdjsonOptions,
472) -> Result<Vec<Value>, JetroEngineError>
473where
474    P: AsRef<Path>,
475{
476    let file = File::open(path)?;
477    collect_ndjson_matches_with_options(
478        engine,
479        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
480        predicate,
481        limit,
482        options,
483    )
484}
485
486pub fn collect_ndjson_matches_source(
487    engine: &JetroEngine,
488    source: NdjsonSource,
489    predicate: &str,
490    limit: usize,
491) -> Result<Vec<Value>, JetroEngineError> {
492    collect_ndjson_matches_source_with_options(
493        engine,
494        source,
495        predicate,
496        limit,
497        NdjsonOptions::default(),
498    )
499}
500
501pub fn collect_ndjson_matches_source_with_options(
502    engine: &JetroEngine,
503    source: NdjsonSource,
504    predicate: &str,
505    limit: usize,
506    options: NdjsonOptions,
507) -> Result<Vec<Value>, JetroEngineError> {
508    match source {
509        NdjsonSource::File(path) => {
510            collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
511        }
512        NdjsonSource::Reader(reader) => {
513            collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
514        }
515    }
516}
517
518pub fn run_ndjson<R, W>(
519    engine: &JetroEngine,
520    reader: R,
521    query: &str,
522    writer: W,
523) -> Result<usize, JetroEngineError>
524where
525    R: BufRead,
526    W: Write,
527{
528    run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
529}
530
531pub fn run_ndjson_file<P, W>(
532    engine: &JetroEngine,
533    path: P,
534    query: &str,
535    writer: W,
536) -> Result<usize, JetroEngineError>
537where
538    P: AsRef<Path>,
539    W: Write,
540{
541    let file = File::open(path)?;
542    let options = NdjsonOptions::default();
543    run_ndjson_with_options(
544        engine,
545        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
546        query,
547        writer,
548        options,
549    )
550}
551
552pub fn run_ndjson_file_with_options<P, W>(
553    engine: &JetroEngine,
554    path: P,
555    query: &str,
556    writer: W,
557    options: NdjsonOptions,
558) -> Result<usize, JetroEngineError>
559where
560    P: AsRef<Path>,
561    W: Write,
562{
563    let file = File::open(path)?;
564    run_ndjson_with_options(
565        engine,
566        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
567        query,
568        writer,
569        options,
570    )
571}
572
573pub fn run_ndjson_with_options<R, W>(
574    engine: &JetroEngine,
575    reader: R,
576    query: &str,
577    writer: W,
578    options: NdjsonOptions,
579) -> Result<usize, JetroEngineError>
580where
581    R: BufRead,
582    W: Write,
583{
584    let mut writer = ndjson_writer_with_options(writer, options);
585    let count = drive_ndjson_val(engine, reader, query, options, |value| {
586        write_val_line(&mut writer, &value)?;
587        Ok(NdjsonControl::Continue)
588    })?;
589    writer.flush()?;
590    Ok(count)
591}
592
593pub fn run_ndjson_limit<R, W>(
594    engine: &JetroEngine,
595    reader: R,
596    query: &str,
597    limit: usize,
598    writer: W,
599) -> Result<usize, JetroEngineError>
600where
601    R: BufRead,
602    W: Write,
603{
604    run_ndjson_limit_with_options(
605        engine,
606        reader,
607        query,
608        limit,
609        writer,
610        NdjsonOptions::default(),
611    )
612}
613
614pub fn run_ndjson_limit_with_options<R, W>(
615    engine: &JetroEngine,
616    reader: R,
617    query: &str,
618    limit: usize,
619    writer: W,
620    options: NdjsonOptions,
621) -> Result<usize, JetroEngineError>
622where
623    R: BufRead,
624    W: Write,
625{
626    if limit == 0 {
627        return Ok(0);
628    }
629
630    let mut writer = ndjson_writer_with_options(writer, options);
631    let mut emitted = 0usize;
632    let count = drive_ndjson_val(engine, reader, query, options, |value| {
633        write_val_line(&mut writer, &value)?;
634        emitted += 1;
635        Ok(if emitted >= limit {
636            NdjsonControl::Stop
637        } else {
638            NdjsonControl::Continue
639        })
640    })?;
641    writer.flush()?;
642    Ok(count)
643}
644
645pub fn run_ndjson_file_limit<P, W>(
646    engine: &JetroEngine,
647    path: P,
648    query: &str,
649    limit: usize,
650    writer: W,
651) -> Result<usize, JetroEngineError>
652where
653    P: AsRef<Path>,
654    W: Write,
655{
656    let file = File::open(path)?;
657    let options = NdjsonOptions::default();
658    run_ndjson_limit_with_options(
659        engine,
660        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
661        query,
662        limit,
663        writer,
664        options,
665    )
666}
667
668pub fn run_ndjson_file_limit_with_options<P, W>(
669    engine: &JetroEngine,
670    path: P,
671    query: &str,
672    limit: usize,
673    writer: W,
674    options: NdjsonOptions,
675) -> Result<usize, JetroEngineError>
676where
677    P: AsRef<Path>,
678    W: Write,
679{
680    let file = File::open(path)?;
681    run_ndjson_limit_with_options(
682        engine,
683        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
684        query,
685        limit,
686        writer,
687        options,
688    )
689}
690
691pub fn run_ndjson_source<W>(
692    engine: &JetroEngine,
693    source: NdjsonSource,
694    query: &str,
695    writer: W,
696) -> Result<usize, JetroEngineError>
697where
698    W: Write,
699{
700    run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
701}
702
703pub fn run_ndjson_source_with_options<W>(
704    engine: &JetroEngine,
705    source: NdjsonSource,
706    query: &str,
707    writer: W,
708    options: NdjsonOptions,
709) -> Result<usize, JetroEngineError>
710where
711    W: Write,
712{
713    match source {
714        NdjsonSource::File(path) => {
715            run_ndjson_file_with_options(engine, path, query, writer, options)
716        }
717        NdjsonSource::Reader(reader) => {
718            run_ndjson_with_options(engine, reader, query, writer, options)
719        }
720    }
721}
722
723pub fn run_ndjson_source_limit<W>(
724    engine: &JetroEngine,
725    source: NdjsonSource,
726    query: &str,
727    limit: usize,
728    writer: W,
729) -> Result<usize, JetroEngineError>
730where
731    W: Write,
732{
733    run_ndjson_source_limit_with_options(
734        engine,
735        source,
736        query,
737        limit,
738        writer,
739        NdjsonOptions::default(),
740    )
741}
742
743pub fn run_ndjson_source_limit_with_options<W>(
744    engine: &JetroEngine,
745    source: NdjsonSource,
746    query: &str,
747    limit: usize,
748    writer: W,
749    options: NdjsonOptions,
750) -> Result<usize, JetroEngineError>
751where
752    W: Write,
753{
754    match source {
755        NdjsonSource::File(path) => {
756            run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
757        }
758        NdjsonSource::Reader(reader) => {
759            run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
760        }
761    }
762}
763
764pub fn run_ndjson_matches<R, W>(
765    engine: &JetroEngine,
766    reader: R,
767    predicate: &str,
768    limit: usize,
769    writer: W,
770) -> Result<usize, JetroEngineError>
771where
772    R: BufRead,
773    W: Write,
774{
775    run_ndjson_matches_with_options(
776        engine,
777        reader,
778        predicate,
779        limit,
780        writer,
781        NdjsonOptions::default(),
782    )
783}
784
785pub fn run_ndjson_matches_with_options<R, W>(
786    engine: &JetroEngine,
787    reader: R,
788    predicate: &str,
789    limit: usize,
790    writer: W,
791    options: NdjsonOptions,
792) -> Result<usize, JetroEngineError>
793where
794    R: BufRead,
795    W: Write,
796{
797    drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
798}
799
800pub fn run_ndjson_matches_file<P, W>(
801    engine: &JetroEngine,
802    path: P,
803    predicate: &str,
804    limit: usize,
805    writer: W,
806) -> Result<usize, JetroEngineError>
807where
808    P: AsRef<Path>,
809    W: Write,
810{
811    let file = File::open(path)?;
812    let options = NdjsonOptions::default();
813    run_ndjson_matches_with_options(
814        engine,
815        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
816        predicate,
817        limit,
818        writer,
819        options,
820    )
821}
822
823pub fn run_ndjson_matches_file_with_options<P, W>(
824    engine: &JetroEngine,
825    path: P,
826    predicate: &str,
827    limit: usize,
828    writer: W,
829    options: NdjsonOptions,
830) -> Result<usize, JetroEngineError>
831where
832    P: AsRef<Path>,
833    W: Write,
834{
835    let file = File::open(path)?;
836    run_ndjson_matches_with_options(
837        engine,
838        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
839        predicate,
840        limit,
841        writer,
842        options,
843    )
844}
845
846pub fn run_ndjson_matches_source<W>(
847    engine: &JetroEngine,
848    source: NdjsonSource,
849    predicate: &str,
850    limit: usize,
851    writer: W,
852) -> Result<usize, JetroEngineError>
853where
854    W: Write,
855{
856    run_ndjson_matches_source_with_options(
857        engine,
858        source,
859        predicate,
860        limit,
861        writer,
862        NdjsonOptions::default(),
863    )
864}
865
866pub fn run_ndjson_matches_source_with_options<W>(
867    engine: &JetroEngine,
868    source: NdjsonSource,
869    predicate: &str,
870    limit: usize,
871    writer: W,
872    options: NdjsonOptions,
873) -> Result<usize, JetroEngineError>
874where
875    W: Write,
876{
877    match source {
878        NdjsonSource::File(path) => {
879            run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
880        }
881        NdjsonSource::Reader(reader) => {
882            run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
883        }
884    }
885}
886
887fn drive_ndjson<R, F>(
888    engine: &JetroEngine,
889    reader: R,
890    query: &str,
891    options: NdjsonOptions,
892    mut emit: F,
893) -> Result<usize, JetroEngineError>
894where
895    R: BufRead,
896    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
897{
898    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
899    let plan = engine.cached_plan(query, PlanningContext::bytes());
900    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
901    let mut count = 0;
902
903    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
904        let document = parse_row(engine, line_no, row)?;
905        let out = collect_row_val(engine, &document, &plan, line_no)?;
906        count += 1;
907        if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
908            break;
909        }
910    }
911
912    Ok(count)
913}
914
915fn drive_ndjson_val<R, F>(
916    engine: &JetroEngine,
917    reader: R,
918    query: &str,
919    options: NdjsonOptions,
920    mut emit: F,
921) -> Result<usize, JetroEngineError>
922where
923    R: BufRead,
924    F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
925{
926    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
927    let mut executor = NdjsonRowExecutor::new(engine, query);
928    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
929    let mut count = 0;
930
931    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
932        count += 1;
933        if matches!(emit(executor.eval_owned_row(line_no, row)?)?, NdjsonControl::Stop) {
934            break;
935        }
936    }
937
938    Ok(count)
939}
940
941fn drive_ndjson_matches<R, F>(
942    engine: &JetroEngine,
943    reader: R,
944    predicate: &str,
945    limit: usize,
946    options: NdjsonOptions,
947    mut emit: F,
948) -> Result<usize, JetroEngineError>
949where
950    R: BufRead,
951    F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
952{
953    if limit == 0 {
954        return Ok(0);
955    }
956
957    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
958    let mut executor = NdjsonRowExecutor::new(engine, predicate);
959    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
960    let mut emitted = 0usize;
961
962    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
963        let document = executor.parse_owned_row(line_no, row)?;
964        let matched = executor.eval_document(line_no, &document)?;
965        if !is_truthy(&matched) {
966            continue;
967        }
968
969        let root = document
970            .root_val_with(engine.keys())
971            .map_err(|err| row_eval_error(line_no, err))?;
972        emitted += 1;
973        if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
974            break;
975        }
976    }
977
978    Ok(emitted)
979}
980
981fn drive_ndjson_matches_writer<R, W>(
982    engine: &JetroEngine,
983    reader: R,
984    predicate: &str,
985    limit: usize,
986    options: NdjsonOptions,
987    writer: W,
988) -> Result<usize, JetroEngineError>
989where
990    R: BufRead,
991    W: Write,
992{
993    if limit == 0 {
994        return Ok(0);
995    }
996
997    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
998    let mut executor = NdjsonRowExecutor::new(engine, predicate);
999    let mut writer = ndjson_writer_with_options(writer, options);
1000    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1001    let mut emitted = 0usize;
1002
1003    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1004        let document = executor.parse_owned_row(line_no, row)?;
1005        let matched = executor.eval_document(line_no, &document)?;
1006        if !is_truthy(&matched) {
1007            continue;
1008        }
1009
1010        write_document_line(&mut writer, &document, line_no, executor.engine())?;
1011        emitted += 1;
1012        if emitted >= limit {
1013            break;
1014        }
1015    }
1016
1017    writer.flush()?;
1018    Ok(emitted)
1019}
1020
1021pub(super) struct NdjsonRowExecutor<'a> {
1022    engine: &'a JetroEngine,
1023    plan: crate::ir::physical::QueryPlan,
1024    vm: MutexGuard<'a, VM>,
1025}
1026
1027impl<'a> NdjsonRowExecutor<'a> {
1028    pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
1029        Self {
1030            engine,
1031            plan: engine.cached_plan(query, PlanningContext::bytes()),
1032            vm: engine.lock_vm(),
1033        }
1034    }
1035
1036    pub(super) fn eval_owned_row(
1037        &mut self,
1038        line_no: u64,
1039        row: Vec<u8>,
1040    ) -> Result<Val, JetroEngineError> {
1041        let document = self.parse_owned_row(line_no, row)?;
1042        self.eval_document(line_no, &document)
1043    }
1044
1045    pub(super) fn parse_owned_row(
1046        &self,
1047        line_no: u64,
1048        row: Vec<u8>,
1049    ) -> Result<Jetro, JetroEngineError> {
1050        parse_row(self.engine, line_no, row)
1051    }
1052
1053    pub(super) fn eval_document(
1054        &mut self,
1055        line_no: u64,
1056        document: &Jetro,
1057    ) -> Result<Val, JetroEngineError> {
1058        crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
1059            .map_err(|err| row_eval_error(line_no, err))
1060    }
1061
1062    pub(super) fn engine(&self) -> &'a JetroEngine {
1063        self.engine
1064    }
1065}
1066
1067pub(super) fn write_val_line<W: Write>(writer: &mut W, value: &Val) -> Result<(), JetroEngineError> {
1068    write_val_json(writer, value)?;
1069    writer.write_all(b"\n")?;
1070    Ok(())
1071}
1072
1073pub(super) fn write_document_line<W: Write>(
1074    writer: &mut W,
1075    document: &Jetro,
1076    line_no: u64,
1077    engine: &JetroEngine,
1078) -> Result<(), JetroEngineError> {
1079    if let Some(bytes) = document.raw_bytes() {
1080        writer.write_all(bytes)?;
1081        writer.write_all(b"\n")?;
1082        return Ok(());
1083    }
1084
1085    let root = document
1086        .root_val_with(engine.keys())
1087        .map_err(|err| row_eval_error(line_no, err))?;
1088    write_val_line(writer, &root)
1089}
1090
1091pub(super) fn ndjson_writer_with_options<W: Write>(
1092    writer: W,
1093    options: NdjsonOptions,
1094) -> BufWriter<W> {
1095    let capacity = options
1096        .reader_buffer_capacity
1097        .max(DEFAULT_READER_BUFFER_CAPACITY);
1098    BufWriter::with_capacity(capacity, writer)
1099}
1100
1101fn write_val_json<W: Write>(writer: &mut W, value: &Val) -> Result<(), JetroEngineError> {
1102    match value {
1103        Val::Null => writer.write_all(b"null")?,
1104        Val::Bool(true) => writer.write_all(b"true")?,
1105        Val::Bool(false) => writer.write_all(b"false")?,
1106        Val::Int(n) => write_i64(writer, *n)?,
1107        Val::Float(n) => write_f64(writer, *n)?,
1108        Val::Str(s) => write_json_str(writer, s.as_ref())?,
1109        Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
1110        Val::Arr(items) => write_json_array(writer, items.iter())?,
1111        Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
1112        Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
1113        Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
1114        Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
1115        Val::Obj(entries) => {
1116            write_json_object(writer, entries.iter().map(|(key, value)| (key.as_ref(), value)))?
1117        }
1118        Val::ObjSmall(entries) => {
1119            write_json_object(writer, entries.iter().map(|(key, value)| (key.as_ref(), value)))?
1120        }
1121        Val::ObjVec(data) => write_json_objvec(writer, data)?,
1122    }
1123    Ok(())
1124}
1125
1126fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1127where
1128    W: Write,
1129    I: IntoIterator<Item = &'a Val>,
1130{
1131    writer.write_all(b"[")?;
1132    let mut first = true;
1133    for item in items {
1134        if first {
1135            first = false;
1136        } else {
1137            writer.write_all(b",")?;
1138        }
1139        write_val_json(writer, item)?;
1140    }
1141    writer.write_all(b"]")?;
1142    Ok(())
1143}
1144
1145fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1146where
1147    W: Write,
1148    I: IntoIterator<Item = i64>,
1149{
1150    writer.write_all(b"[")?;
1151    let mut first = true;
1152    let mut buf = itoa::Buffer::new();
1153    for item in items {
1154        if first {
1155            first = false;
1156        } else {
1157            writer.write_all(b",")?;
1158        }
1159        writer.write_all(buf.format(item).as_bytes())?;
1160    }
1161    writer.write_all(b"]")?;
1162    Ok(())
1163}
1164
1165fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1166where
1167    W: Write,
1168    I: IntoIterator<Item = f64>,
1169{
1170    writer.write_all(b"[")?;
1171    let mut first = true;
1172    let mut buf = ryu::Buffer::new();
1173    for item in items {
1174        if first {
1175            first = false;
1176        } else {
1177            writer.write_all(b",")?;
1178        }
1179        if item.is_finite() {
1180            writer.write_all(buf.format(item).as_bytes())?;
1181        } else {
1182            writer.write_all(b"0")?;
1183        }
1184    }
1185    writer.write_all(b"]")?;
1186    Ok(())
1187}
1188
1189fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
1190where
1191    W: Write,
1192    I: IntoIterator<Item = &'a str>,
1193{
1194    writer.write_all(b"[")?;
1195    let mut first = true;
1196    for item in items {
1197        if first {
1198            first = false;
1199        } else {
1200            writer.write_all(b",")?;
1201        }
1202        write_json_str(writer, item)?;
1203    }
1204    writer.write_all(b"]")?;
1205    Ok(())
1206}
1207
1208fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
1209where
1210    W: Write,
1211    I: IntoIterator<Item = (&'a str, &'a Val)>,
1212{
1213    writer.write_all(b"{")?;
1214    let mut first = true;
1215    for (key, value) in entries {
1216        if first {
1217            first = false;
1218        } else {
1219            writer.write_all(b",")?;
1220        }
1221        write_json_str(writer, key)?;
1222        writer.write_all(b":")?;
1223        write_val_json(writer, value)?;
1224    }
1225    writer.write_all(b"}")?;
1226    Ok(())
1227}
1228
1229fn write_json_objvec<W: Write>(
1230    writer: &mut W,
1231    data: &crate::data::value::ObjVecData,
1232) -> Result<(), JetroEngineError> {
1233    writer.write_all(b"[")?;
1234    for row in 0..data.nrows() {
1235        if row > 0 {
1236            writer.write_all(b",")?;
1237        }
1238        writer.write_all(b"{")?;
1239        for slot in 0..data.stride() {
1240            if slot > 0 {
1241                writer.write_all(b",")?;
1242            }
1243            write_json_str(writer, data.keys[slot].as_ref())?;
1244            writer.write_all(b":")?;
1245            write_val_json(writer, data.cell(row, slot))?;
1246        }
1247        writer.write_all(b"}")?;
1248    }
1249    writer.write_all(b"]")?;
1250    Ok(())
1251}
1252
1253fn write_json_str<W: Write>(writer: &mut W, value: &str) -> Result<(), JetroEngineError> {
1254    writer.write_all(b"\"")?;
1255    let bytes = value.as_bytes();
1256    if !needs_json_escape(bytes) {
1257        writer.write_all(bytes)?;
1258        writer.write_all(b"\"")?;
1259        return Ok(());
1260    }
1261
1262    let mut start = 0usize;
1263
1264    for (idx, &byte) in bytes.iter().enumerate() {
1265        let escaped = match byte {
1266            b'"' => Some(br#"\""#.as_slice()),
1267            b'\\' => Some(br#"\\"#.as_slice()),
1268            b'\n' => Some(br#"\n"#.as_slice()),
1269            b'\r' => Some(br#"\r"#.as_slice()),
1270            b'\t' => Some(br#"\t"#.as_slice()),
1271            0x08 => Some(br#"\b"#.as_slice()),
1272            0x0c => Some(br#"\f"#.as_slice()),
1273            0x00..=0x1f => None,
1274            _ => continue,
1275        };
1276
1277        if start < idx {
1278            writer.write_all(&bytes[start..idx])?;
1279        }
1280        match escaped {
1281            Some(seq) => writer.write_all(seq)?,
1282            None => write_control_escape(writer, byte)?,
1283        }
1284        start = idx + 1;
1285    }
1286
1287    if start < bytes.len() {
1288        writer.write_all(&bytes[start..])?;
1289    }
1290    writer.write_all(b"\"")?;
1291    Ok(())
1292}
1293
1294#[inline]
1295fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
1296    let mut buf = itoa::Buffer::new();
1297    writer.write_all(buf.format(value).as_bytes())?;
1298    Ok(())
1299}
1300
1301#[inline]
1302fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
1303    if value.is_finite() {
1304        let mut buf = ryu::Buffer::new();
1305        writer.write_all(buf.format(value).as_bytes())?;
1306    } else {
1307        writer.write_all(b"0")?;
1308    }
1309    Ok(())
1310}
1311
1312#[inline]
1313fn needs_json_escape(bytes: &[u8]) -> bool {
1314    bytes
1315        .iter()
1316        .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
1317}
1318
1319fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
1320    const HEX: &[u8; 16] = b"0123456789abcdef";
1321    writer.write_all(&[
1322        b'\\',
1323        b'u',
1324        b'0',
1325        b'0',
1326        HEX[(byte >> 4) as usize],
1327        HEX[(byte & 0x0f) as usize],
1328    ])?;
1329    Ok(())
1330}
1331
1332pub(super) fn collect_row_val(
1333    engine: &JetroEngine,
1334    document: &Jetro,
1335    plan: &crate::ir::physical::QueryPlan,
1336    line_no: u64,
1337) -> Result<Val, JetroEngineError> {
1338    engine
1339        .collect_prepared_val(document, plan)
1340        .map_err(|err| row_eval_error(line_no, err))
1341}
1342
1343pub(super) fn parse_row(
1344    engine: &JetroEngine,
1345    line_no: u64,
1346    row: Vec<u8>,
1347) -> Result<Jetro, JetroEngineError> {
1348    engine
1349        .parse_bytes_lazy(row)
1350        .map_err(|err| row_parse_error(line_no, err))
1351}
1352
1353fn row_parse_error(line_no: u64, err: JetroEngineError) -> JetroEngineError {
1354    match err {
1355        JetroEngineError::Json(source) => RowError::InvalidJson { line_no, source }.into(),
1356        JetroEngineError::Eval(eval) => RowError::InvalidJsonMessage {
1357            line_no,
1358            message: eval.to_string(),
1359        }
1360        .into(),
1361        other => other,
1362    }
1363}
1364
1365pub(super) fn row_eval_error(line_no: u64, err: crate::EvalError) -> JetroEngineError {
1366    let message = err.0;
1367    if message.starts_with("Invalid JSON:") {
1368        RowError::InvalidJsonMessage { line_no, message }.into()
1369    } else {
1370        crate::EvalError(message).into()
1371    }
1372}
1373
1374fn trim_line_ending(buf: &mut Vec<u8>) {
1375    while matches!(buf.last(), Some(b'\n' | b'\r')) {
1376        buf.pop();
1377    }
1378}
1379
1380fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
1381    if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
1382        buf.drain(..3);
1383    }
1384}
1385
1386fn non_ws_range(buf: &[u8]) -> (usize, usize) {
1387    let start = buf
1388        .iter()
1389        .position(|b| !b.is_ascii_whitespace())
1390        .unwrap_or(buf.len());
1391    let end = buf
1392        .iter()
1393        .rposition(|b| !b.is_ascii_whitespace())
1394        .map(|idx| idx + 1)
1395        .unwrap_or(start);
1396    (start, end)
1397}
1398
1399#[cfg(test)]
1400mod tests {
1401    #[test]
1402    #[cfg(feature = "simd-json")]
1403    fn parse_row_keeps_simd_document_lazy() {
1404        let engine = crate::JetroEngine::new();
1405        let row = br#"{"name":"Ada","age":30}"#.to_vec();
1406
1407        let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
1408
1409        assert!(!document.root_val_is_materialized());
1410        assert!(!document.tape_is_built());
1411    }
1412
1413    #[test]
1414    fn owned_row_read_preserves_reusable_buffer_capacity() {
1415        let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
1416        let mut driver = super::NdjsonPerRowDriver::new(input);
1417        let mut buf = Vec::with_capacity(128);
1418
1419        let first = driver
1420            .read_next_owned(&mut buf)
1421            .expect("row read succeeds")
1422            .expect("first row exists");
1423        assert_eq!(first.1, br#"{"n":1}"#);
1424        assert_eq!(buf.capacity(), 128);
1425
1426        let second = driver
1427            .read_next_owned(&mut buf)
1428            .expect("row read succeeds")
1429            .expect("second row exists");
1430        assert_eq!(second.1, br#"{"n":2}"#);
1431        assert_eq!(buf.capacity(), 128);
1432    }
1433}