Skip to main content

jetro_core/io/
ndjson.rs

1use super::{NdjsonSource, RowError};
2use crate::data::value::Val;
3use crate::plan::physical::PlanningContext;
4use crate::util::is_truthy;
5use crate::{Jetro, JetroEngine, JetroEngineError, VM};
6use memchr::memchr;
7use serde_json::Value;
8use std::fs::File;
9use std::io::{BufRead, BufWriter, Write};
10use std::path::Path;
11use std::sync::MutexGuard;
12
13#[cfg(feature = "simd-json")]
14use super::ndjson_byte::{
15    eval_ndjson_byte_predicate_row, tape_plan_can_write_byte_row, write_ndjson_byte_plan_row,
16    write_ndjson_byte_tape_plan_row, BytePlanWrite,
17};
18#[cfg(test)]
19#[cfg(feature = "simd-json")]
20pub(super) use super::ndjson_direct::direct_byte_plan;
21#[cfg(feature = "simd-json")]
22pub(super) use super::ndjson_direct::{
23    direct_tape_plan, direct_tape_predicate, direct_writer_plans, NdjsonDirectBytePlan,
24    NdjsonDirectElement, NdjsonDirectItemPredicate, NdjsonDirectPredicate,
25    NdjsonDirectProjectionValue, NdjsonDirectStreamMap, NdjsonDirectStreamPlan,
26    NdjsonDirectStreamSink, NdjsonDirectTapePlan,
27};
28
29const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
30const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
31const DEFAULT_READER_BUFFER_CAPACITY: usize = 1024 * 1024;
32pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
33
34/// Configuration for per-row NDJSON execution.
35#[derive(Clone, Copy, Debug, PartialEq, Eq)]
36pub struct NdjsonOptions {
37    pub max_line_len: usize,
38    pub initial_buffer_capacity: usize,
39    pub reader_buffer_capacity: usize,
40    pub reverse_chunk_size: usize,
41}
42
43impl Default for NdjsonOptions {
44    fn default() -> Self {
45        Self {
46            max_line_len: DEFAULT_MAX_LINE_LEN,
47            initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
48            reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
49            reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
50        }
51    }
52}
53
54impl NdjsonOptions {
55    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
56        self.max_line_len = max_line_len;
57        self
58    }
59
60    pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
61        self.initial_buffer_capacity = capacity;
62        self
63    }
64
65    pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
66        self.reader_buffer_capacity = capacity;
67        self
68    }
69
70    pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
71        self.reverse_chunk_size = capacity;
72        self
73    }
74}
75
76/// Forward-only per-row NDJSON reader.
77pub struct NdjsonPerRowDriver<R> {
78    reader: R,
79    line_no: u64,
80    max_line_len: usize,
81}
82
83impl<R: BufRead> NdjsonPerRowDriver<R> {
84    pub fn new(reader: R) -> Self {
85        Self {
86            reader,
87            line_no: 0,
88            max_line_len: DEFAULT_MAX_LINE_LEN,
89        }
90    }
91
92    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
93        self.max_line_len = max_line_len;
94        self
95    }
96
97    pub fn line_no(&self) -> u64 {
98        self.line_no
99    }
100
101    /// Read the next non-empty NDJSON row into `buf`, returning its 1-based line
102    /// number. Empty and whitespace-only rows are skipped.
103    pub fn read_next_nonempty<'a>(
104        &mut self,
105        buf: &'a mut Vec<u8>,
106    ) -> Result<Option<(u64, &'a [u8])>, RowError> {
107        loop {
108            buf.clear();
109            let read = self.read_physical_line(buf)?;
110            if read == 0 {
111                return Ok(None);
112            }
113            self.line_no += 1;
114
115            strip_initial_bom(self.line_no, buf);
116            trim_line_ending(buf);
117
118            let (start, end) = non_ws_range(buf);
119            if start == end {
120                continue;
121            }
122
123            let len = end - start;
124            if len > self.max_line_len {
125                return Err(RowError::LineTooLarge {
126                    line_no: self.line_no,
127                    len,
128                    max: self.max_line_len,
129                });
130            }
131
132            return Ok(Some((self.line_no, &buf[start..end])));
133        }
134    }
135
136    /// Read the next non-empty row and transfer ownership of `buf` to the
137    /// caller. This is the hot path used by `JetroEngine` NDJSON execution so
138    /// the row can be parsed without an extra bytes copy.
139    pub fn read_next_owned(
140        &mut self,
141        buf: &mut Vec<u8>,
142    ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
143        loop {
144            buf.clear();
145            let read = self.read_physical_line(buf)?;
146            if read == 0 {
147                return Ok(None);
148            }
149            self.line_no += 1;
150
151            strip_initial_bom(self.line_no, buf);
152            trim_line_ending(buf);
153
154            let (start, end) = non_ws_range(buf);
155            if start == end {
156                continue;
157            }
158
159            let len = end - start;
160            if len > self.max_line_len {
161                return Err(RowError::LineTooLarge {
162                    line_no: self.line_no,
163                    len,
164                    max: self.max_line_len,
165                });
166            }
167
168            let capacity = buf.capacity();
169            return Ok(Some((
170                self.line_no,
171                std::mem::replace(buf, Vec::with_capacity(capacity)),
172            )));
173        }
174    }
175
176    fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
177        loop {
178            let available = self.reader.fill_buf()?;
179            if available.is_empty() {
180                return Ok(buf.len());
181            }
182
183            if let Some(pos) = memchr(b'\n', available) {
184                buf.extend_from_slice(&available[..=pos]);
185                self.reader.consume(pos + 1);
186                self.check_physical_line_len(buf.len())?;
187                return Ok(buf.len());
188            }
189
190            let len = available.len();
191            buf.extend_from_slice(available);
192            self.reader.consume(len);
193            self.check_physical_line_len(buf.len())?;
194        }
195    }
196
197    fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
198        let hard_max = self.max_line_len.saturating_add(2);
199        if len > hard_max {
200            return Err(RowError::LineTooLarge {
201                line_no: self.line_no + 1,
202                len,
203                max: self.max_line_len,
204            });
205        }
206        Ok(())
207    }
208}
209
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211pub enum NdjsonControl {
212    Continue,
213    Stop,
214}
215
216pub fn for_each_ndjson<R, F>(
217    engine: &JetroEngine,
218    reader: R,
219    query: &str,
220    f: F,
221) -> Result<usize, JetroEngineError>
222where
223    R: BufRead,
224    F: FnMut(Value),
225{
226    for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
227}
228
229pub fn for_each_ndjson_with_options<R, F>(
230    engine: &JetroEngine,
231    reader: R,
232    query: &str,
233    options: NdjsonOptions,
234    mut f: F,
235) -> Result<usize, JetroEngineError>
236where
237    R: BufRead,
238    F: FnMut(Value),
239{
240    drive_ndjson(engine, reader, query, options, |value| {
241        f(value);
242        Ok(NdjsonControl::Continue)
243    })
244}
245
246pub fn for_each_ndjson_until<R, F>(
247    engine: &JetroEngine,
248    reader: R,
249    query: &str,
250    f: F,
251) -> Result<usize, JetroEngineError>
252where
253    R: BufRead,
254    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
255{
256    for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
257}
258
259pub fn for_each_ndjson_until_with_options<R, F>(
260    engine: &JetroEngine,
261    reader: R,
262    query: &str,
263    options: NdjsonOptions,
264    f: F,
265) -> Result<usize, JetroEngineError>
266where
267    R: BufRead,
268    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
269{
270    drive_ndjson(engine, reader, query, options, f)
271}
272
273pub fn for_each_ndjson_source<F>(
274    engine: &JetroEngine,
275    source: NdjsonSource,
276    query: &str,
277    f: F,
278) -> Result<usize, JetroEngineError>
279where
280    F: FnMut(Value),
281{
282    for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
283}
284
285pub fn for_each_ndjson_source_with_options<F>(
286    engine: &JetroEngine,
287    source: NdjsonSource,
288    query: &str,
289    options: NdjsonOptions,
290    f: F,
291) -> Result<usize, JetroEngineError>
292where
293    F: FnMut(Value),
294{
295    match source {
296        NdjsonSource::File(path) => {
297            let file = File::open(path)?;
298            for_each_ndjson_with_options(
299                engine,
300                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
301                query,
302                options,
303                f,
304            )
305        }
306        NdjsonSource::Reader(reader) => {
307            for_each_ndjson_with_options(engine, reader, query, options, f)
308        }
309    }
310}
311
312pub fn for_each_ndjson_source_until<F>(
313    engine: &JetroEngine,
314    source: NdjsonSource,
315    query: &str,
316    f: F,
317) -> Result<usize, JetroEngineError>
318where
319    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
320{
321    for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
322}
323
324pub fn for_each_ndjson_source_until_with_options<F>(
325    engine: &JetroEngine,
326    source: NdjsonSource,
327    query: &str,
328    options: NdjsonOptions,
329    f: F,
330) -> Result<usize, JetroEngineError>
331where
332    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
333{
334    match source {
335        NdjsonSource::File(path) => {
336            let file = File::open(path)?;
337            for_each_ndjson_until_with_options(
338                engine,
339                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
340                query,
341                options,
342                f,
343            )
344        }
345        NdjsonSource::Reader(reader) => {
346            for_each_ndjson_until_with_options(engine, reader, query, options, f)
347        }
348    }
349}
350
351pub fn collect_ndjson<R>(
352    engine: &JetroEngine,
353    reader: R,
354    query: &str,
355) -> Result<Vec<Value>, JetroEngineError>
356where
357    R: BufRead,
358{
359    collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
360}
361
362pub fn collect_ndjson_with_options<R>(
363    engine: &JetroEngine,
364    reader: R,
365    query: &str,
366    options: NdjsonOptions,
367) -> Result<Vec<Value>, JetroEngineError>
368where
369    R: BufRead,
370{
371    let mut values = Vec::new();
372    for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
373    Ok(values)
374}
375
376pub fn collect_ndjson_file<P>(
377    engine: &JetroEngine,
378    path: P,
379    query: &str,
380) -> Result<Vec<Value>, JetroEngineError>
381where
382    P: AsRef<Path>,
383{
384    let file = File::open(path)?;
385    let options = NdjsonOptions::default();
386    collect_ndjson_with_options(
387        engine,
388        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
389        query,
390        options,
391    )
392}
393
394pub fn collect_ndjson_file_with_options<P>(
395    engine: &JetroEngine,
396    path: P,
397    query: &str,
398    options: NdjsonOptions,
399) -> Result<Vec<Value>, JetroEngineError>
400where
401    P: AsRef<Path>,
402{
403    let file = File::open(path)?;
404    collect_ndjson_with_options(
405        engine,
406        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
407        query,
408        options,
409    )
410}
411
412pub fn collect_ndjson_source(
413    engine: &JetroEngine,
414    source: NdjsonSource,
415    query: &str,
416) -> Result<Vec<Value>, JetroEngineError> {
417    collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
418}
419
420pub fn collect_ndjson_source_with_options(
421    engine: &JetroEngine,
422    source: NdjsonSource,
423    query: &str,
424    options: NdjsonOptions,
425) -> Result<Vec<Value>, JetroEngineError> {
426    match source {
427        NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
428        NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
429    }
430}
431
432pub fn collect_ndjson_matches<R>(
433    engine: &JetroEngine,
434    reader: R,
435    predicate: &str,
436    limit: usize,
437) -> Result<Vec<Value>, JetroEngineError>
438where
439    R: BufRead,
440{
441    collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
442}
443
444pub fn collect_ndjson_matches_with_options<R>(
445    engine: &JetroEngine,
446    reader: R,
447    predicate: &str,
448    limit: usize,
449    options: NdjsonOptions,
450) -> Result<Vec<Value>, JetroEngineError>
451where
452    R: BufRead,
453{
454    let mut values = Vec::with_capacity(limit);
455    drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
456        values.push(Value::from(value));
457        Ok(NdjsonControl::Continue)
458    })?;
459    Ok(values)
460}
461
462pub fn collect_ndjson_matches_file<P>(
463    engine: &JetroEngine,
464    path: P,
465    predicate: &str,
466    limit: usize,
467) -> Result<Vec<Value>, JetroEngineError>
468where
469    P: AsRef<Path>,
470{
471    let file = File::open(path)?;
472    let options = NdjsonOptions::default();
473    collect_ndjson_matches_with_options(
474        engine,
475        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
476        predicate,
477        limit,
478        options,
479    )
480}
481
482pub fn collect_ndjson_matches_file_with_options<P>(
483    engine: &JetroEngine,
484    path: P,
485    predicate: &str,
486    limit: usize,
487    options: NdjsonOptions,
488) -> Result<Vec<Value>, JetroEngineError>
489where
490    P: AsRef<Path>,
491{
492    let file = File::open(path)?;
493    collect_ndjson_matches_with_options(
494        engine,
495        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
496        predicate,
497        limit,
498        options,
499    )
500}
501
502pub fn collect_ndjson_matches_source(
503    engine: &JetroEngine,
504    source: NdjsonSource,
505    predicate: &str,
506    limit: usize,
507) -> Result<Vec<Value>, JetroEngineError> {
508    collect_ndjson_matches_source_with_options(
509        engine,
510        source,
511        predicate,
512        limit,
513        NdjsonOptions::default(),
514    )
515}
516
517pub fn collect_ndjson_matches_source_with_options(
518    engine: &JetroEngine,
519    source: NdjsonSource,
520    predicate: &str,
521    limit: usize,
522    options: NdjsonOptions,
523) -> Result<Vec<Value>, JetroEngineError> {
524    match source {
525        NdjsonSource::File(path) => {
526            collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
527        }
528        NdjsonSource::Reader(reader) => {
529            collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
530        }
531    }
532}
533
534pub fn run_ndjson<R, W>(
535    engine: &JetroEngine,
536    reader: R,
537    query: &str,
538    writer: W,
539) -> Result<usize, JetroEngineError>
540where
541    R: BufRead,
542    W: Write,
543{
544    run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
545}
546
547pub fn run_ndjson_file<P, W>(
548    engine: &JetroEngine,
549    path: P,
550    query: &str,
551    writer: W,
552) -> Result<usize, JetroEngineError>
553where
554    P: AsRef<Path>,
555    W: Write,
556{
557    let file = File::open(path)?;
558    let options = NdjsonOptions::default();
559    run_ndjson_with_options(
560        engine,
561        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
562        query,
563        writer,
564        options,
565    )
566}
567
568pub fn run_ndjson_file_with_options<P, W>(
569    engine: &JetroEngine,
570    path: P,
571    query: &str,
572    writer: W,
573    options: NdjsonOptions,
574) -> Result<usize, JetroEngineError>
575where
576    P: AsRef<Path>,
577    W: Write,
578{
579    let file = File::open(path)?;
580    run_ndjson_with_options(
581        engine,
582        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
583        query,
584        writer,
585        options,
586    )
587}
588
589pub fn run_ndjson_with_options<R, W>(
590    engine: &JetroEngine,
591    reader: R,
592    query: &str,
593    writer: W,
594    options: NdjsonOptions,
595) -> Result<usize, JetroEngineError>
596where
597    R: BufRead,
598    W: Write,
599{
600    drive_ndjson_writer(engine, reader, query, None, options, writer)
601}
602
603pub fn run_ndjson_limit<R, W>(
604    engine: &JetroEngine,
605    reader: R,
606    query: &str,
607    limit: usize,
608    writer: W,
609) -> Result<usize, JetroEngineError>
610where
611    R: BufRead,
612    W: Write,
613{
614    run_ndjson_limit_with_options(
615        engine,
616        reader,
617        query,
618        limit,
619        writer,
620        NdjsonOptions::default(),
621    )
622}
623
624pub fn run_ndjson_limit_with_options<R, W>(
625    engine: &JetroEngine,
626    reader: R,
627    query: &str,
628    limit: usize,
629    writer: W,
630    options: NdjsonOptions,
631) -> Result<usize, JetroEngineError>
632where
633    R: BufRead,
634    W: Write,
635{
636    if limit == 0 {
637        return Ok(0);
638    }
639
640    drive_ndjson_writer(engine, reader, query, Some(limit), options, writer)
641}
642
643pub fn run_ndjson_file_limit<P, W>(
644    engine: &JetroEngine,
645    path: P,
646    query: &str,
647    limit: usize,
648    writer: W,
649) -> Result<usize, JetroEngineError>
650where
651    P: AsRef<Path>,
652    W: Write,
653{
654    let file = File::open(path)?;
655    let options = NdjsonOptions::default();
656    run_ndjson_limit_with_options(
657        engine,
658        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
659        query,
660        limit,
661        writer,
662        options,
663    )
664}
665
666pub fn run_ndjson_file_limit_with_options<P, W>(
667    engine: &JetroEngine,
668    path: P,
669    query: &str,
670    limit: usize,
671    writer: W,
672    options: NdjsonOptions,
673) -> Result<usize, JetroEngineError>
674where
675    P: AsRef<Path>,
676    W: Write,
677{
678    let file = File::open(path)?;
679    run_ndjson_limit_with_options(
680        engine,
681        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
682        query,
683        limit,
684        writer,
685        options,
686    )
687}
688
689pub fn run_ndjson_source<W>(
690    engine: &JetroEngine,
691    source: NdjsonSource,
692    query: &str,
693    writer: W,
694) -> Result<usize, JetroEngineError>
695where
696    W: Write,
697{
698    run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
699}
700
701pub fn run_ndjson_source_with_options<W>(
702    engine: &JetroEngine,
703    source: NdjsonSource,
704    query: &str,
705    writer: W,
706    options: NdjsonOptions,
707) -> Result<usize, JetroEngineError>
708where
709    W: Write,
710{
711    match source {
712        NdjsonSource::File(path) => {
713            run_ndjson_file_with_options(engine, path, query, writer, options)
714        }
715        NdjsonSource::Reader(reader) => {
716            run_ndjson_with_options(engine, reader, query, writer, options)
717        }
718    }
719}
720
721pub fn run_ndjson_source_limit<W>(
722    engine: &JetroEngine,
723    source: NdjsonSource,
724    query: &str,
725    limit: usize,
726    writer: W,
727) -> Result<usize, JetroEngineError>
728where
729    W: Write,
730{
731    run_ndjson_source_limit_with_options(
732        engine,
733        source,
734        query,
735        limit,
736        writer,
737        NdjsonOptions::default(),
738    )
739}
740
741pub fn run_ndjson_source_limit_with_options<W>(
742    engine: &JetroEngine,
743    source: NdjsonSource,
744    query: &str,
745    limit: usize,
746    writer: W,
747    options: NdjsonOptions,
748) -> Result<usize, JetroEngineError>
749where
750    W: Write,
751{
752    match source {
753        NdjsonSource::File(path) => {
754            run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
755        }
756        NdjsonSource::Reader(reader) => {
757            run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
758        }
759    }
760}
761
762pub fn run_ndjson_matches<R, W>(
763    engine: &JetroEngine,
764    reader: R,
765    predicate: &str,
766    limit: usize,
767    writer: W,
768) -> Result<usize, JetroEngineError>
769where
770    R: BufRead,
771    W: Write,
772{
773    run_ndjson_matches_with_options(
774        engine,
775        reader,
776        predicate,
777        limit,
778        writer,
779        NdjsonOptions::default(),
780    )
781}
782
783pub fn run_ndjson_matches_with_options<R, W>(
784    engine: &JetroEngine,
785    reader: R,
786    predicate: &str,
787    limit: usize,
788    writer: W,
789    options: NdjsonOptions,
790) -> Result<usize, JetroEngineError>
791where
792    R: BufRead,
793    W: Write,
794{
795    drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
796}
797
798pub fn run_ndjson_matches_file<P, W>(
799    engine: &JetroEngine,
800    path: P,
801    predicate: &str,
802    limit: usize,
803    writer: W,
804) -> Result<usize, JetroEngineError>
805where
806    P: AsRef<Path>,
807    W: Write,
808{
809    let file = File::open(path)?;
810    let options = NdjsonOptions::default();
811    run_ndjson_matches_with_options(
812        engine,
813        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
814        predicate,
815        limit,
816        writer,
817        options,
818    )
819}
820
821pub fn run_ndjson_matches_file_with_options<P, W>(
822    engine: &JetroEngine,
823    path: P,
824    predicate: &str,
825    limit: usize,
826    writer: W,
827    options: NdjsonOptions,
828) -> Result<usize, JetroEngineError>
829where
830    P: AsRef<Path>,
831    W: Write,
832{
833    let file = File::open(path)?;
834    run_ndjson_matches_with_options(
835        engine,
836        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
837        predicate,
838        limit,
839        writer,
840        options,
841    )
842}
843
844pub fn run_ndjson_matches_source<W>(
845    engine: &JetroEngine,
846    source: NdjsonSource,
847    predicate: &str,
848    limit: usize,
849    writer: W,
850) -> Result<usize, JetroEngineError>
851where
852    W: Write,
853{
854    run_ndjson_matches_source_with_options(
855        engine,
856        source,
857        predicate,
858        limit,
859        writer,
860        NdjsonOptions::default(),
861    )
862}
863
864pub fn run_ndjson_matches_source_with_options<W>(
865    engine: &JetroEngine,
866    source: NdjsonSource,
867    predicate: &str,
868    limit: usize,
869    writer: W,
870    options: NdjsonOptions,
871) -> Result<usize, JetroEngineError>
872where
873    W: Write,
874{
875    match source {
876        NdjsonSource::File(path) => {
877            run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
878        }
879        NdjsonSource::Reader(reader) => {
880            run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
881        }
882    }
883}
884
885fn drive_ndjson<R, F>(
886    engine: &JetroEngine,
887    reader: R,
888    query: &str,
889    options: NdjsonOptions,
890    mut emit: F,
891) -> Result<usize, JetroEngineError>
892where
893    R: BufRead,
894    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
895{
896    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
897    let plan = engine.cached_plan(query, PlanningContext::bytes());
898    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
899    let mut count = 0;
900
901    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
902        let document = parse_row(engine, line_no, row)?;
903        let out = collect_row_val(engine, &document, &plan, line_no)?;
904        count += 1;
905        if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
906            break;
907        }
908    }
909
910    Ok(count)
911}
912
913fn drive_ndjson_writer<R, W>(
914    engine: &JetroEngine,
915    reader: R,
916    query: &str,
917    limit: Option<usize>,
918    options: NdjsonOptions,
919    writer: W,
920) -> Result<usize, JetroEngineError>
921where
922    R: BufRead,
923    W: Write,
924{
925    #[cfg(feature = "simd-json")]
926    if let Some((byte_plan, tape_plan)) = direct_writer_plans(engine, query) {
927        if let Some(byte_plan) = byte_plan {
928            return drive_ndjson_byte_writer(
929                engine, reader, &byte_plan, &tape_plan, limit, options, writer,
930            );
931        }
932        if tape_plan_can_write_byte_row(&tape_plan) {
933            return drive_ndjson_tape_byte_writer(
934                engine, reader, &tape_plan, limit, options, writer,
935            );
936        }
937        return drive_ndjson_tape_writer(engine, reader, &tape_plan, limit, options, writer);
938    }
939
940    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
941    let mut executor = NdjsonRowExecutor::new(engine, query);
942    let mut writer = ndjson_writer_with_options(writer, options);
943    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
944    let mut count = 0usize;
945
946    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
947        count += 1;
948        executor.write_owned_row(line_no, row, &mut writer)?;
949        if limit.is_some_and(|limit| count >= limit) {
950            break;
951        }
952    }
953
954    writer.flush()?;
955    Ok(count)
956}
957
958#[cfg(feature = "simd-json")]
959fn drive_ndjson_byte_writer<R, W>(
960    engine: &JetroEngine,
961    reader: R,
962    byte_plan: &NdjsonDirectBytePlan,
963    tape_plan: &NdjsonDirectTapePlan,
964    limit: Option<usize>,
965    options: NdjsonOptions,
966    writer: W,
967) -> Result<usize, JetroEngineError>
968where
969    R: BufRead,
970    W: Write,
971{
972    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
973    let mut writer = ndjson_writer_with_options(writer, options);
974    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
975    let mut scratch =
976        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
977    let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
978    let mut count = 0usize;
979
980    visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
981        match write_ndjson_byte_plan_row(&mut writer, row, byte_plan)? {
982            BytePlanWrite::Done => {}
983            BytePlanWrite::Fallback => {
984                scratch.parse_slice(row).map_err(|message| {
985                    row_parse_error(
986                        line_no,
987                        JetroEngineError::Eval(crate::EvalError(format!(
988                            "Invalid JSON: {message}"
989                        ))),
990                    )
991                })?;
992                tape_runner.write_row(&scratch, &mut writer)?;
993            }
994        }
995        writer.write_all(b"\n")?;
996        count += 1;
997        Ok(!limit.is_some_and(|limit| count >= limit))
998    })?;
999
1000    writer.flush()?;
1001    Ok(count)
1002}
1003
1004#[cfg(feature = "simd-json")]
1005fn drive_ndjson_tape_byte_writer<R, W>(
1006    engine: &JetroEngine,
1007    reader: R,
1008    tape_plan: &NdjsonDirectTapePlan,
1009    limit: Option<usize>,
1010    options: NdjsonOptions,
1011    writer: W,
1012) -> Result<usize, JetroEngineError>
1013where
1014    R: BufRead,
1015    W: Write,
1016{
1017    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1018    let mut writer = ndjson_writer_with_options(writer, options);
1019    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1020    let mut scratch =
1021        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1022    let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
1023    let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1024    let mut count = 0usize;
1025
1026    visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1027        match write_ndjson_byte_tape_plan_row(&mut writer, row, tape_plan, &mut byte_scratch)? {
1028            BytePlanWrite::Done => {}
1029            BytePlanWrite::Fallback => {
1030                scratch.parse_slice(row).map_err(|message| {
1031                    row_parse_error(
1032                        line_no,
1033                        JetroEngineError::Eval(crate::EvalError(format!(
1034                            "Invalid JSON: {message}"
1035                        ))),
1036                    )
1037                })?;
1038                tape_runner.write_row(&scratch, &mut writer)?;
1039            }
1040        }
1041        writer.write_all(b"\n")?;
1042        count += 1;
1043        Ok(!limit.is_some_and(|limit| count >= limit))
1044    })?;
1045
1046    writer.flush()?;
1047    Ok(count)
1048}
1049
1050#[cfg(feature = "simd-json")]
1051fn visit_ndjson_borrowed_rows<R, F>(
1052    driver: &mut NdjsonPerRowDriver<R>,
1053    spill: &mut Vec<u8>,
1054    mut visit: F,
1055) -> Result<(), JetroEngineError>
1056where
1057    R: BufRead,
1058    F: FnMut(u64, &[u8]) -> Result<bool, JetroEngineError>,
1059{
1060    loop {
1061        spill.clear();
1062        let available = driver.reader.fill_buf()?;
1063        if available.is_empty() {
1064            return Ok(());
1065        }
1066        if let Some(pos) = memchr(b'\n', available) {
1067            driver.line_no += 1;
1068            let line_no = driver.line_no;
1069            let mut row = &available[..pos];
1070            if row.last() == Some(&b'\r') {
1071                row = &row[..row.len() - 1];
1072            }
1073            if line_no == 1 && row.starts_with(&[0xef, 0xbb, 0xbf]) {
1074                row = &row[3..];
1075            }
1076            let (start, end) = non_ws_range(row);
1077            let keep_going = if start == end {
1078                true
1079            } else {
1080                let row = &row[start..end];
1081                if row.len() > driver.max_line_len {
1082                    return Err(RowError::LineTooLarge {
1083                        line_no,
1084                        len: row.len(),
1085                        max: driver.max_line_len,
1086                    }
1087                    .into());
1088                }
1089                visit(line_no, row)?
1090            };
1091            driver.reader.consume(pos + 1);
1092            if !keep_going {
1093                return Ok(());
1094            }
1095        } else {
1096            let read = driver.read_physical_line(spill)?;
1097            if read == 0 {
1098                return Ok(());
1099            }
1100            driver.line_no += 1;
1101            strip_initial_bom(driver.line_no, spill);
1102            trim_line_ending(spill);
1103            let (start, end) = non_ws_range(spill);
1104            if start == end {
1105                continue;
1106            }
1107            let len = end - start;
1108            if len > driver.max_line_len {
1109                return Err(RowError::LineTooLarge {
1110                    line_no: driver.line_no,
1111                    len,
1112                    max: driver.max_line_len,
1113                }
1114                .into());
1115            }
1116            if !visit(driver.line_no, &spill[start..end])? {
1117                return Ok(());
1118            }
1119        }
1120    }
1121}
1122
1123#[cfg(feature = "simd-json")]
1124fn drive_ndjson_tape_writer<R, W>(
1125    engine: &JetroEngine,
1126    reader: R,
1127    plan: &NdjsonDirectTapePlan,
1128    limit: Option<usize>,
1129    options: NdjsonOptions,
1130    writer: W,
1131) -> Result<usize, JetroEngineError>
1132where
1133    R: BufRead,
1134    W: Write,
1135{
1136    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1137    let mut writer = ndjson_writer_with_options(writer, options);
1138    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1139    let mut scratch =
1140        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1141    let mut count = 0usize;
1142    let mut runner = NdjsonTapeWriterRunner::new(engine, plan);
1143
1144    while let Some((line_no, row)) = driver.read_next_nonempty(&mut line)? {
1145        scratch.parse_slice(row).map_err(|message| {
1146            row_parse_error(
1147                line_no,
1148                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
1149            )
1150        })?;
1151        runner.write_row(&scratch, &mut writer)?;
1152        writer.write_all(b"\n")?;
1153        count += 1;
1154        if limit.is_some_and(|limit| count >= limit) {
1155            break;
1156        }
1157    }
1158
1159    writer.flush()?;
1160    Ok(count)
1161}
1162
1163#[cfg(feature = "simd-json")]
1164pub(super) struct NdjsonTapeWriterRunner<'a, 'p> {
1165    plan: &'p NdjsonDirectTapePlan,
1166    vm: Option<MutexGuard<'a, VM>>,
1167    env: Option<crate::data::context::Env>,
1168    root_path: NdjsonPathCache,
1169    source_path: NdjsonPathCache,
1170    suffix_path: NdjsonPathCache,
1171    predicate_path: NdjsonPathCache,
1172    object_paths: Vec<NdjsonPathCache>,
1173}
1174
1175#[cfg(feature = "simd-json")]
1176impl<'a, 'p> NdjsonTapeWriterRunner<'a, 'p> {
1177    pub(super) fn new(engine: &'a JetroEngine, plan: &'p NdjsonDirectTapePlan) -> Self {
1178        let needs_vm = plan.needs_vm();
1179        Self {
1180            plan,
1181            vm: needs_vm.then(|| engine.lock_vm()),
1182            env: needs_vm.then(|| crate::data::context::Env::new(Val::Null)),
1183            root_path: NdjsonPathCache::default(),
1184            source_path: NdjsonPathCache::default(),
1185            suffix_path: NdjsonPathCache::default(),
1186            predicate_path: NdjsonPathCache::default(),
1187            object_paths: Vec::new(),
1188        }
1189    }
1190
1191    pub(super) fn write_row<W: Write>(
1192        &mut self,
1193        scratch: &crate::data::tape::TapeScratch,
1194        writer: &mut W,
1195    ) -> Result<(), JetroEngineError> {
1196        match self.plan {
1197            NdjsonDirectTapePlan::RootPath(steps) => {
1198                if let Some(idx) = self.root_path.index(scratch, 0, steps) {
1199                    write_json_tape_at(writer, scratch, idx)?;
1200                } else {
1201                    writer.write_all(b"null")?;
1202                }
1203            }
1204            NdjsonDirectTapePlan::ViewScalarCall {
1205                steps,
1206                call,
1207                optional,
1208            } => {
1209                let idx = self.root_path.index(scratch, 0, steps);
1210                let value = idx
1211                    .map(|idx| json_tape_scalar(scratch, idx))
1212                    .unwrap_or(crate::util::JsonView::Null);
1213                if *optional && matches!(value, crate::util::JsonView::Null) {
1214                    writer.write_all(b"null")?;
1215                } else if let Some(value) = call.try_apply_json_view(value) {
1216                    write_val_json(writer, &value)?;
1217                } else if let Some(idx) = idx {
1218                    write_json_tape_at(writer, scratch, idx)?;
1219                } else {
1220                    writer.write_all(b"null")?;
1221                }
1222            }
1223            NdjsonDirectTapePlan::ArrayElementViewScalarCall {
1224                source_steps,
1225                element,
1226                suffix_steps,
1227                call,
1228            } => {
1229                let idx = self
1230                    .source_path
1231                    .index(scratch, 0, source_steps)
1232                    .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1233                    .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1234                if let Some(value) = idx
1235                    .map(|idx| json_tape_scalar(scratch, idx))
1236                    .and_then(|value| call.try_apply_json_view(value))
1237                {
1238                    write_val_json(writer, &value)?;
1239                } else if let Some(idx) = idx {
1240                    write_json_tape_at(writer, scratch, idx)?;
1241                } else {
1242                    writer.write_all(b"null")?;
1243                }
1244            }
1245            NdjsonDirectTapePlan::ObjectItems { steps, method } => {
1246                let idx = self.root_path.index(scratch, 0, steps);
1247                write_json_tape_object_items(writer, scratch, idx, *method)?;
1248            }
1249            NdjsonDirectTapePlan::ArrayElementPath {
1250                source_steps,
1251                element,
1252                suffix_steps,
1253            } => {
1254                let idx = self
1255                    .source_path
1256                    .index(scratch, 0, source_steps)
1257                    .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1258                    .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1259                if let Some(idx) = idx {
1260                    write_json_tape_at(writer, scratch, idx)?;
1261                } else {
1262                    writer.write_all(b"null")?;
1263                }
1264            }
1265            NdjsonDirectTapePlan::Stream(plan) => {
1266                write_json_tape_stream(
1267                    writer,
1268                    scratch,
1269                    plan,
1270                    &mut self.source_path,
1271                    &mut self.suffix_path,
1272                    &mut self.predicate_path,
1273                    &mut self.object_paths,
1274                )?;
1275            }
1276            NdjsonDirectTapePlan::Object(fields) => {
1277                write_json_tape_object_projection(writer, scratch, fields, &mut self.object_paths)?;
1278            }
1279            NdjsonDirectTapePlan::Array(items) => {
1280                write_json_tape_array_projection(writer, scratch, items, &mut self.object_paths)?;
1281            }
1282            NdjsonDirectTapePlan::ViewPipeline { source_steps, body } => {
1283                let (Some(vm), Some(env)) = (self.vm.as_deref_mut(), self.env.as_ref()) else {
1284                    return Err(JetroEngineError::Eval(crate::EvalError(
1285                        "NDJSON view pipeline requires VM state".to_string(),
1286                    )));
1287                };
1288                let source = json_tape_path_index(scratch, source_steps)
1289                    .map(|idx| crate::data::view::TapeScratchView::Node { tape: scratch, idx })
1290                    .unwrap_or(crate::data::view::TapeScratchView::Missing);
1291                let Some(result) =
1292                    crate::exec::view::run_with_env_and_vm(source, body, None, &env, vm)
1293                else {
1294                    writer.write_all(b"null")?;
1295                    return Ok(());
1296                };
1297                write_val_json(writer, &result.map_err(JetroEngineError::Eval)?)?;
1298            }
1299        }
1300        Ok(())
1301    }
1302}
1303
1304#[cfg(feature = "simd-json")]
1305#[derive(Default)]
1306pub(super) struct NdjsonPathCache {
1307    // Per-depth verified field deltas for stable object layouts. Every use
1308    // checks the cached key before returning the value node, so mixed-schema
1309    // rows safely fall back to a normal object scan.
1310    fields: Vec<Option<NdjsonFieldCache>>,
1311}
1312
1313#[cfg(feature = "simd-json")]
1314#[derive(Clone, Copy)]
1315struct NdjsonFieldCache {
1316    key_delta: usize,
1317    value_delta: usize,
1318}
1319
1320#[cfg(feature = "simd-json")]
1321struct NdjsonPathCaches<'a> {
1322    source: &'a mut NdjsonPathCache,
1323    suffix: &'a mut NdjsonPathCache,
1324    predicate: &'a mut NdjsonPathCache,
1325}
1326
1327#[cfg(feature = "simd-json")]
1328impl NdjsonPathCache {
1329    fn index<T: JsonTape>(
1330        &mut self,
1331        tape: &T,
1332        start: usize,
1333        steps: &[crate::ir::physical::PhysicalPathStep],
1334    ) -> Option<usize> {
1335        if let Some(idx) = self.index_cached(tape, start, steps) {
1336            return Some(idx);
1337        }
1338        self.index_uncached(tape, start, steps)
1339    }
1340
1341    fn index_cached<T: JsonTape>(
1342        &self,
1343        tape: &T,
1344        start: usize,
1345        steps: &[crate::ir::physical::PhysicalPathStep],
1346    ) -> Option<usize> {
1347        use crate::ir::physical::PhysicalPathStep;
1348
1349        let [PhysicalPathStep::Field(key), rest @ ..] = steps else {
1350            return None;
1351        };
1352        if rest
1353            .iter()
1354            .any(|step| matches!(step, PhysicalPathStep::Field(_)))
1355        {
1356            return None;
1357        }
1358        let Some(field) = self
1359            .fields
1360            .first()
1361            .copied()
1362            .flatten()
1363            .filter(|field| field.key_delta > 1)
1364        else {
1365            return None;
1366        };
1367        let idx = json_tape_object_cached_field(tape, start, field, key.as_ref())?;
1368        let mut cur = idx;
1369        for step in rest {
1370            cur = json_tape_step_index(tape, cur, step)?;
1371        }
1372        Some(cur)
1373    }
1374
1375    fn index_uncached<T: JsonTape>(
1376        &mut self,
1377        tape: &T,
1378        start: usize,
1379        steps: &[crate::ir::physical::PhysicalPathStep],
1380    ) -> Option<usize> {
1381        self.index_from_depth(tape, start, steps, 0)
1382    }
1383
1384    fn index_from_depth<T: JsonTape>(
1385        &mut self,
1386        tape: &T,
1387        start: usize,
1388        steps: &[crate::ir::physical::PhysicalPathStep],
1389        depth: usize,
1390    ) -> Option<usize> {
1391        use crate::ir::physical::PhysicalPathStep;
1392
1393        match steps {
1394            [] => Some(start),
1395            [PhysicalPathStep::Field(key), rest @ ..] => {
1396                if self.fields.len() <= depth {
1397                    self.fields.resize(depth + 1, None);
1398                }
1399
1400                if let Some(field) = self.fields[depth].filter(|field| field.key_delta > 1) {
1401                    if let Some(idx) =
1402                        json_tape_object_cached_field(tape, start, field, key.as_ref())
1403                    {
1404                        return self.index_from_depth(tape, idx, rest, depth + 1);
1405                    }
1406                }
1407
1408                let (idx, field) =
1409                    json_tape_object_field_index_and_cache(tape, start, key.as_ref())?;
1410                self.fields[depth] = Some(field);
1411                self.index_from_depth(tape, idx, rest, depth + 1)
1412            }
1413            [step, rest @ ..] => {
1414                let idx = json_tape_step_index(tape, start, step)?;
1415                self.index_from_depth(tape, idx, rest, depth + 1)
1416            }
1417        }
1418    }
1419}
1420
1421#[cfg(feature = "simd-json")]
1422fn drive_ndjson_tape_matches_writer<R, W>(
1423    engine: &JetroEngine,
1424    reader: R,
1425    predicate: &NdjsonDirectPredicate,
1426    limit: usize,
1427    options: NdjsonOptions,
1428    writer: W,
1429) -> Result<usize, JetroEngineError>
1430where
1431    R: BufRead,
1432    W: Write,
1433{
1434    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1435    let mut writer = ndjson_writer_with_options(writer, options);
1436    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1437    let mut scratch =
1438        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1439    let mut emitted = 0usize;
1440    let needs_vm = predicate_needs_vm(predicate);
1441    let mut vm = needs_vm.then(|| engine.lock_vm());
1442    let env = needs_vm.then(|| crate::data::context::Env::new(Val::Null));
1443    let mut predicate_path = NdjsonPathCache::default();
1444
1445    while let Some((line_no, row)) = driver.read_next_owned(&mut line)? {
1446        if let Some(matched) = eval_ndjson_byte_predicate_row(&row, predicate)? {
1447            if !matched {
1448                continue;
1449            }
1450            writer.write_all(&row)?;
1451            writer.write_all(b"\n")?;
1452            emitted += 1;
1453            if emitted >= limit {
1454                break;
1455            }
1456            continue;
1457        }
1458
1459        scratch.parse_slice(&row).map_err(|message| {
1460            row_parse_error(
1461                line_no,
1462                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
1463            )
1464        })?;
1465        if !eval_tape_predicate(
1466            &scratch,
1467            predicate,
1468            env.as_ref(),
1469            &mut vm,
1470            &mut predicate_path,
1471        )
1472        .map_err(JetroEngineError::Eval)?
1473        {
1474            continue;
1475        }
1476        writer.write_all(&row)?;
1477        writer.write_all(b"\n")?;
1478        emitted += 1;
1479        if emitted >= limit {
1480            break;
1481        }
1482    }
1483
1484    writer.flush()?;
1485    Ok(emitted)
1486}
1487
1488fn drive_ndjson_matches<R, F>(
1489    engine: &JetroEngine,
1490    reader: R,
1491    predicate: &str,
1492    limit: usize,
1493    options: NdjsonOptions,
1494    mut emit: F,
1495) -> Result<usize, JetroEngineError>
1496where
1497    R: BufRead,
1498    F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
1499{
1500    if limit == 0 {
1501        return Ok(0);
1502    }
1503
1504    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1505    #[cfg(feature = "simd-json")]
1506    let direct_predicate = direct_tape_predicate(engine, predicate);
1507    let mut executor = NdjsonRowExecutor::new(engine, predicate);
1508    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1509    let mut emitted = 0usize;
1510
1511    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1512        #[cfg(feature = "simd-json")]
1513        if let Some(predicate) = direct_predicate.as_ref() {
1514            if let Some(false) = eval_ndjson_byte_predicate_row(&row, predicate)? {
1515                continue;
1516            }
1517        }
1518
1519        let document = executor.parse_owned_row(line_no, row)?;
1520        let matched = executor.eval_document(line_no, &document)?;
1521        if !is_truthy(&matched) {
1522            continue;
1523        }
1524
1525        let root = document
1526            .root_val_with(engine.keys())
1527            .map_err(|err| row_eval_error(line_no, err))?;
1528        emitted += 1;
1529        if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
1530            break;
1531        }
1532    }
1533
1534    Ok(emitted)
1535}
1536
1537fn drive_ndjson_matches_writer<R, W>(
1538    engine: &JetroEngine,
1539    reader: R,
1540    predicate: &str,
1541    limit: usize,
1542    options: NdjsonOptions,
1543    writer: W,
1544) -> Result<usize, JetroEngineError>
1545where
1546    R: BufRead,
1547    W: Write,
1548{
1549    if limit == 0 {
1550        return Ok(0);
1551    }
1552
1553    #[cfg(feature = "simd-json")]
1554    if let Some(predicate) = direct_tape_predicate(engine, predicate) {
1555        return drive_ndjson_tape_matches_writer(
1556            engine, reader, &predicate, limit, options, writer,
1557        );
1558    }
1559
1560    let mut driver = NdjsonPerRowDriver::new(reader).with_max_line_len(options.max_line_len);
1561    let mut executor = NdjsonRowExecutor::new(engine, predicate);
1562    let mut writer = ndjson_writer_with_options(writer, options);
1563    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1564    let mut emitted = 0usize;
1565
1566    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1567        let document = executor.parse_owned_row(line_no, row)?;
1568        let matched = executor.eval_document(line_no, &document)?;
1569        if !is_truthy(&matched) {
1570            continue;
1571        }
1572
1573        write_document_line(&mut writer, &document, line_no, executor.engine())?;
1574        emitted += 1;
1575        if emitted >= limit {
1576            break;
1577        }
1578    }
1579
1580    writer.flush()?;
1581    Ok(emitted)
1582}
1583
1584pub(super) struct NdjsonRowExecutor<'a> {
1585    engine: &'a JetroEngine,
1586    plan: crate::ir::physical::QueryPlan,
1587    vm: MutexGuard<'a, VM>,
1588}
1589
1590impl<'a> NdjsonRowExecutor<'a> {
1591    pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
1592        Self {
1593            engine,
1594            plan: engine.cached_plan(query, PlanningContext::bytes()),
1595            vm: engine.lock_vm(),
1596        }
1597    }
1598
1599    pub(super) fn eval_owned_row(
1600        &mut self,
1601        line_no: u64,
1602        row: Vec<u8>,
1603    ) -> Result<Val, JetroEngineError> {
1604        let document = self.parse_owned_row(line_no, row)?;
1605        self.eval_document(line_no, &document)
1606    }
1607
1608    pub(super) fn write_owned_row<W: Write>(
1609        &mut self,
1610        line_no: u64,
1611        row: Vec<u8>,
1612        writer: &mut W,
1613    ) -> Result<(), JetroEngineError> {
1614        let document = self.parse_owned_result_row(line_no, row)?;
1615        self.write_document_result(line_no, &document, writer)
1616    }
1617
1618    fn parse_owned_result_row(
1619        &self,
1620        line_no: u64,
1621        row: Vec<u8>,
1622    ) -> Result<Jetro, JetroEngineError> {
1623        #[cfg(feature = "simd-json")]
1624        {
1625            crate::data::tape::TapeData::parse(row)
1626                .map(Jetro::from_tape_data)
1627                .map_err(|message| {
1628                    row_parse_error(
1629                        line_no,
1630                        JetroEngineError::Eval(crate::EvalError(format!(
1631                            "Invalid JSON: {message}"
1632                        ))),
1633                    )
1634                })
1635        }
1636        #[cfg(not(feature = "simd-json"))]
1637        {
1638            self.parse_owned_row(line_no, row)
1639        }
1640    }
1641
1642    pub(super) fn parse_owned_row(
1643        &self,
1644        line_no: u64,
1645        row: Vec<u8>,
1646    ) -> Result<Jetro, JetroEngineError> {
1647        parse_row(self.engine, line_no, row)
1648    }
1649
1650    pub(super) fn eval_document(
1651        &mut self,
1652        line_no: u64,
1653        document: &Jetro,
1654    ) -> Result<Val, JetroEngineError> {
1655        crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
1656            .map_err(|err| row_eval_error(line_no, err))
1657    }
1658
1659    pub(super) fn write_document_result<W: Write>(
1660        &mut self,
1661        line_no: u64,
1662        document: &Jetro,
1663        writer: &mut W,
1664    ) -> Result<(), JetroEngineError> {
1665        if self.try_write_tape_result(line_no, document, writer)? {
1666            return Ok(());
1667        }
1668        let value = self.eval_document(line_no, document)?;
1669        write_val_line(writer, &value)
1670    }
1671
1672    fn try_write_tape_result<W: Write>(
1673        &self,
1674        line_no: u64,
1675        document: &Jetro,
1676        writer: &mut W,
1677    ) -> Result<bool, JetroEngineError> {
1678        #[cfg(feature = "simd-json")]
1679        {
1680            use crate::ir::physical::{PlanNode, QueryRoot};
1681
1682            let QueryRoot::Node(root) = self.plan.root() else {
1683                return Ok(false);
1684            };
1685            let PlanNode::RootPath(steps) = self.plan.node(*root) else {
1686                return Ok(false);
1687            };
1688            let Some(tape) = document
1689                .lazy_tape()
1690                .map_err(|err| row_eval_error(line_no, err))?
1691            else {
1692                return Ok(false);
1693            };
1694            if let Some(idx) = json_tape_path_index(tape.as_ref(), steps) {
1695                write_json_tape_at(writer, tape.as_ref(), idx)?;
1696            } else {
1697                writer.write_all(b"null")?;
1698            }
1699            writer.write_all(b"\n")?;
1700            Ok(true)
1701        }
1702        #[cfg(not(feature = "simd-json"))]
1703        {
1704            let _ = (line_no, document, writer);
1705            Ok(false)
1706        }
1707    }
1708
1709    pub(super) fn engine(&self) -> &'a JetroEngine {
1710        self.engine
1711    }
1712}
1713
1714#[cfg(feature = "simd-json")]
1715trait JsonTape {
1716    fn nodes(&self) -> &[crate::data::tape::TapeNode];
1717    fn str_at(&self, idx: usize) -> &str;
1718    fn span(&self, idx: usize) -> usize;
1719}
1720
1721#[cfg(feature = "simd-json")]
1722impl JsonTape for crate::data::tape::TapeData {
1723    #[inline]
1724    fn nodes(&self) -> &[crate::data::tape::TapeNode] {
1725        &self.nodes
1726    }
1727
1728    #[inline]
1729    fn str_at(&self, idx: usize) -> &str {
1730        self.str_at(idx)
1731    }
1732
1733    #[inline]
1734    fn span(&self, idx: usize) -> usize {
1735        self.span(idx)
1736    }
1737}
1738
1739#[cfg(feature = "simd-json")]
1740impl JsonTape for crate::data::tape::TapeScratch {
1741    #[inline]
1742    fn nodes(&self) -> &[crate::data::tape::TapeNode] {
1743        &self.nodes
1744    }
1745
1746    #[inline]
1747    fn str_at(&self, idx: usize) -> &str {
1748        self.str_at(idx)
1749    }
1750
1751    #[inline]
1752    fn span(&self, idx: usize) -> usize {
1753        self.span(idx)
1754    }
1755}
1756
1757#[cfg(feature = "simd-json")]
1758fn json_tape_path_index<T: JsonTape>(
1759    tape: &T,
1760    steps: &[crate::ir::physical::PhysicalPathStep],
1761) -> Option<usize> {
1762    json_tape_path_index_from(tape, 0, steps)
1763}
1764
1765#[cfg(feature = "simd-json")]
1766fn json_tape_path_index_from<T: JsonTape>(
1767    tape: &T,
1768    start: usize,
1769    steps: &[crate::ir::physical::PhysicalPathStep],
1770) -> Option<usize> {
1771    if tape.nodes().is_empty() {
1772        return None;
1773    }
1774
1775    return match steps {
1776        [] => Some(start),
1777        [step] => json_tape_step_index(tape, start, step),
1778        [first, second] => json_tape_step_index(tape, start, first)
1779            .and_then(|idx| json_tape_step_index(tape, idx, second)),
1780        _ => json_tape_path_index_slow(tape, start, steps),
1781    };
1782}
1783
1784#[cfg(feature = "simd-json")]
1785fn json_tape_path_index_slow<T: JsonTape>(
1786    tape: &T,
1787    start: usize,
1788    steps: &[crate::ir::physical::PhysicalPathStep],
1789) -> Option<usize> {
1790    let mut idx = start;
1791    for step in steps {
1792        idx = json_tape_step_index(tape, idx, step)?;
1793    }
1794    Some(idx)
1795}
1796
1797#[cfg(feature = "simd-json")]
1798fn json_tape_step_index<T: JsonTape>(
1799    tape: &T,
1800    start: usize,
1801    step: &crate::ir::physical::PhysicalPathStep,
1802) -> Option<usize> {
1803    use crate::data::tape::TapeNode;
1804    use crate::ir::physical::PhysicalPathStep;
1805
1806    match step {
1807        PhysicalPathStep::Field(key) => {
1808            let TapeNode::Object { len, .. } = tape.nodes()[start] else {
1809                return None;
1810            };
1811            let mut cur = start + 1;
1812            for _ in 0..len {
1813                if tape.str_at(cur) == key.as_ref() {
1814                    return Some(cur + 1);
1815                }
1816                cur += 1;
1817                cur += tape.span(cur);
1818            }
1819            None
1820        }
1821        PhysicalPathStep::Index(wanted) => {
1822            let TapeNode::Array { len, .. } = tape.nodes()[start] else {
1823                return None;
1824            };
1825            let wanted = if *wanted < 0 {
1826                len.checked_sub(wanted.unsigned_abs() as usize)?
1827            } else {
1828                *wanted as usize
1829            };
1830            if wanted >= len {
1831                return None;
1832            }
1833            let mut cur = start + 1;
1834            for _ in 0..wanted {
1835                cur += tape.span(cur);
1836            }
1837            Some(cur)
1838        }
1839    }
1840}
1841
1842#[cfg(feature = "simd-json")]
1843fn json_tape_object_cached_field<T: JsonTape>(
1844    tape: &T,
1845    obj_idx: usize,
1846    cache: NdjsonFieldCache,
1847    key: &str,
1848) -> Option<usize> {
1849    let crate::data::tape::TapeNode::Object { .. } = tape.nodes().get(obj_idx).copied()? else {
1850        return None;
1851    };
1852    let key_idx = obj_idx.checked_add(cache.key_delta)?;
1853    let value_idx = obj_idx.checked_add(cache.value_delta)?;
1854    if value_idx >= tape.nodes().len() {
1855        return None;
1856    }
1857    if !matches!(
1858        tape.nodes().get(key_idx),
1859        Some(crate::data::tape::TapeNode::String(_))
1860    ) {
1861        return None;
1862    }
1863    (tape.str_at(key_idx) == key).then_some(value_idx)
1864}
1865
1866#[cfg(feature = "simd-json")]
1867fn json_tape_object_field_index_and_cache<T: JsonTape>(
1868    tape: &T,
1869    obj_idx: usize,
1870    key: &str,
1871) -> Option<(usize, NdjsonFieldCache)> {
1872    let crate::data::tape::TapeNode::Object { len, .. } = tape.nodes()[obj_idx] else {
1873        return None;
1874    };
1875    let mut cur = obj_idx + 1;
1876    for _ in 0..len {
1877        if tape.str_at(cur) == key {
1878            return Some((
1879                cur + 1,
1880                NdjsonFieldCache {
1881                    key_delta: cur - obj_idx,
1882                    value_delta: cur + 1 - obj_idx,
1883                },
1884            ));
1885        }
1886        cur += 1;
1887        cur += tape.span(cur);
1888    }
1889    None
1890}
1891
1892#[cfg(feature = "simd-json")]
1893fn json_tape_array_element<T: JsonTape>(
1894    tape: &T,
1895    idx: usize,
1896    element: NdjsonDirectElement,
1897) -> Option<usize> {
1898    let crate::data::tape::TapeNode::Array { len, .. } = tape.nodes().get(idx).copied()? else {
1899        return None;
1900    };
1901    let wanted = match element {
1902        NdjsonDirectElement::First => 0,
1903        NdjsonDirectElement::Last => len.checked_sub(1)?,
1904        NdjsonDirectElement::Nth(n) => n,
1905    };
1906    if wanted >= len {
1907        return None;
1908    }
1909    let mut cur = idx + 1;
1910    for _ in 0..wanted {
1911        cur += tape.span(cur);
1912    }
1913    Some(cur)
1914}
1915
1916#[cfg(feature = "simd-json")]
1917pub(super) fn eval_tape_predicate(
1918    tape: &crate::data::tape::TapeScratch,
1919    predicate: &NdjsonDirectPredicate,
1920    env: Option<&crate::data::context::Env>,
1921    vm: &mut Option<std::sync::MutexGuard<'_, crate::vm::exec::VM>>,
1922    cache: &mut NdjsonPathCache,
1923) -> Result<bool, crate::EvalError> {
1924    use crate::parse::ast::BinOp;
1925
1926    Ok(match predicate {
1927        NdjsonDirectPredicate::Path(steps) => cache
1928            .index(tape, 0, steps)
1929            .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
1930            .unwrap_or(false),
1931        NdjsonDirectPredicate::Literal(value) => crate::util::is_truthy(value),
1932        NdjsonDirectPredicate::Not(inner) => !eval_tape_predicate(tape, inner, env, vm, cache)?,
1933        NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
1934            eval_tape_predicate(tape, lhs, env, vm, cache)?
1935                && eval_tape_predicate(tape, rhs, env, vm, cache)?
1936        }
1937        NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
1938            eval_tape_predicate(tape, lhs, env, vm, cache)?
1939                || eval_tape_predicate(tape, rhs, env, vm, cache)?
1940        }
1941        NdjsonDirectPredicate::Binary { lhs, op, rhs } => {
1942            let Some(lhs) = eval_tape_scalar(tape, lhs, cache) else {
1943                return Ok(false);
1944            };
1945            let Some(rhs) = eval_tape_scalar(tape, rhs, cache) else {
1946                return Ok(false);
1947            };
1948            crate::util::json_cmp_binop(lhs, *op, rhs)
1949        }
1950        NdjsonDirectPredicate::ViewScalarCall { steps, call } => cache
1951            .index(tape, 0, steps)
1952            .map(|idx| json_tape_scalar(tape, idx))
1953            .and_then(|value| call.try_apply_json_view(value))
1954            .is_some_and(|value| crate::util::is_truthy(&value)),
1955        NdjsonDirectPredicate::ArrayElementViewScalarCall {
1956            source_steps,
1957            element,
1958            suffix_steps,
1959            call,
1960        } => json_tape_path_index(tape, source_steps)
1961            .and_then(|idx| json_tape_array_element(tape, idx, *element))
1962            .and_then(|idx| json_tape_path_index_from(tape, idx, suffix_steps))
1963            .map(|idx| json_tape_scalar(tape, idx))
1964            .and_then(|value| call.try_apply_json_view(value))
1965            .is_some_and(|value| crate::util::is_truthy(&value)),
1966        NdjsonDirectPredicate::ViewPipeline { source_steps, body } => {
1967            let (Some(vm), Some(env)) = (vm.as_deref_mut(), env) else {
1968                return Err(crate::EvalError(
1969                    "view pipeline predicate requires VM state".to_string(),
1970                ));
1971            };
1972            let source = json_tape_path_index(tape, source_steps)
1973                .map(|idx| crate::data::view::TapeScratchView::Node { tape, idx })
1974                .unwrap_or(crate::data::view::TapeScratchView::Missing);
1975            crate::exec::view::run_with_env_and_vm(source, body, None, env, vm)
1976                .transpose()?
1977                .is_some_and(|value| crate::util::is_truthy(&value))
1978        }
1979    })
1980}
1981
1982#[cfg(feature = "simd-json")]
1983pub(super) fn predicate_needs_vm(predicate: &NdjsonDirectPredicate) -> bool {
1984    match predicate {
1985        NdjsonDirectPredicate::Not(inner) => predicate_needs_vm(inner),
1986        NdjsonDirectPredicate::Binary { lhs, rhs, .. } => {
1987            predicate_needs_vm(lhs) || predicate_needs_vm(rhs)
1988        }
1989        NdjsonDirectPredicate::ViewPipeline { .. } => true,
1990        NdjsonDirectPredicate::Path(_)
1991        | NdjsonDirectPredicate::Literal(_)
1992        | NdjsonDirectPredicate::ViewScalarCall { .. }
1993        | NdjsonDirectPredicate::ArrayElementViewScalarCall { .. } => false,
1994    }
1995}
1996
1997#[cfg(feature = "simd-json")]
1998fn eval_tape_scalar<'a>(
1999    tape: &'a crate::data::tape::TapeScratch,
2000    predicate: &'a NdjsonDirectPredicate,
2001    cache: &mut NdjsonPathCache,
2002) -> Option<crate::util::JsonView<'a>> {
2003    match predicate {
2004        NdjsonDirectPredicate::Path(steps) => cache
2005            .index(tape, 0, steps)
2006            .map(|idx| json_tape_scalar(tape, idx)),
2007        NdjsonDirectPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2008        _ => None,
2009    }
2010}
2011
2012#[cfg(feature = "simd-json")]
2013fn json_view_truthy(value: crate::util::JsonView<'_>) -> bool {
2014    match value {
2015        crate::util::JsonView::Null => false,
2016        crate::util::JsonView::Bool(value) => value,
2017        crate::util::JsonView::Int(value) => value != 0,
2018        crate::util::JsonView::UInt(value) => value != 0,
2019        crate::util::JsonView::Float(value) => value != 0.0,
2020        crate::util::JsonView::Str(value) => !value.is_empty(),
2021        crate::util::JsonView::ArrayLen(len) | crate::util::JsonView::ObjectLen(len) => len > 0,
2022    }
2023}
2024
2025#[cfg(feature = "simd-json")]
2026fn json_tape_scalar<T: JsonTape>(tape: &T, idx: usize) -> crate::util::JsonView<'_> {
2027    use crate::data::tape::TapeNode;
2028    use simd_json::StaticNode as SN;
2029
2030    let Some(node) = tape.nodes().get(idx).copied() else {
2031        return crate::util::JsonView::Null;
2032    };
2033    match node {
2034        TapeNode::Static(SN::Null) => crate::util::JsonView::Null,
2035        TapeNode::Static(SN::Bool(value)) => crate::util::JsonView::Bool(value),
2036        TapeNode::Static(SN::I64(value)) => crate::util::JsonView::Int(value),
2037        TapeNode::Static(SN::U64(value)) => crate::util::JsonView::UInt(value),
2038        TapeNode::Static(SN::F64(value)) => crate::util::JsonView::Float(value),
2039        TapeNode::String(_) => crate::util::JsonView::Str(tape.str_at(idx)),
2040        TapeNode::Array { len, .. } => crate::util::JsonView::ArrayLen(len),
2041        TapeNode::Object { len, .. } => crate::util::JsonView::ObjectLen(len),
2042    }
2043}
2044
2045pub(super) fn write_val_line<W: Write>(
2046    writer: &mut W,
2047    value: &Val,
2048) -> Result<(), JetroEngineError> {
2049    write_val_json(writer, value)?;
2050    writer.write_all(b"\n")?;
2051    Ok(())
2052}
2053
2054pub(super) fn write_document_line<W: Write>(
2055    writer: &mut W,
2056    document: &Jetro,
2057    line_no: u64,
2058    engine: &JetroEngine,
2059) -> Result<(), JetroEngineError> {
2060    if let Some(bytes) = document.raw_bytes() {
2061        writer.write_all(bytes)?;
2062        writer.write_all(b"\n")?;
2063        return Ok(());
2064    }
2065
2066    let root = document
2067        .root_val_with(engine.keys())
2068        .map_err(|err| row_eval_error(line_no, err))?;
2069    write_val_line(writer, &root)
2070}
2071
2072pub(super) fn ndjson_writer_with_options<W: Write>(
2073    writer: W,
2074    options: NdjsonOptions,
2075) -> BufWriter<W> {
2076    let capacity = options
2077        .reader_buffer_capacity
2078        .max(DEFAULT_READER_BUFFER_CAPACITY);
2079    BufWriter::with_capacity(capacity, writer)
2080}
2081
2082pub(super) fn write_val_json<W: Write>(
2083    writer: &mut W,
2084    value: &Val,
2085) -> Result<(), JetroEngineError> {
2086    match value {
2087        Val::Null => writer.write_all(b"null")?,
2088        Val::Bool(true) => writer.write_all(b"true")?,
2089        Val::Bool(false) => writer.write_all(b"false")?,
2090        Val::Int(n) => write_i64(writer, *n)?,
2091        Val::Float(n) => write_f64(writer, *n)?,
2092        Val::Str(s) => write_json_str(writer, s.as_ref())?,
2093        Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
2094        Val::Arr(items) => write_json_array(writer, items.iter())?,
2095        Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
2096        Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
2097        Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
2098        Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
2099        Val::Obj(entries) => write_json_object(
2100            writer,
2101            entries.iter().map(|(key, value)| (key.as_ref(), value)),
2102        )?,
2103        Val::ObjSmall(entries) => write_json_object(
2104            writer,
2105            entries.iter().map(|(key, value)| (key.as_ref(), value)),
2106        )?,
2107        Val::ObjVec(data) => write_json_objvec(writer, data)?,
2108    }
2109    Ok(())
2110}
2111
2112#[cfg(feature = "simd-json")]
2113fn write_json_tape_at<W: Write, T: JsonTape>(
2114    writer: &mut W,
2115    tape: &T,
2116    idx: usize,
2117) -> Result<usize, JetroEngineError> {
2118    use crate::data::tape::TapeNode;
2119    use simd_json::StaticNode as SN;
2120
2121    let Some(node) = tape.nodes().get(idx).copied() else {
2122        writer.write_all(b"null")?;
2123        return Ok(idx);
2124    };
2125
2126    match node {
2127        TapeNode::Static(SN::Null) => {
2128            writer.write_all(b"null")?;
2129            Ok(idx + 1)
2130        }
2131        TapeNode::Static(SN::Bool(true)) => {
2132            writer.write_all(b"true")?;
2133            Ok(idx + 1)
2134        }
2135        TapeNode::Static(SN::Bool(false)) => {
2136            writer.write_all(b"false")?;
2137            Ok(idx + 1)
2138        }
2139        TapeNode::Static(SN::I64(value)) => {
2140            write_i64(writer, value)?;
2141            Ok(idx + 1)
2142        }
2143        TapeNode::Static(SN::U64(value)) => {
2144            write_u64(writer, value)?;
2145            Ok(idx + 1)
2146        }
2147        TapeNode::Static(SN::F64(value)) => {
2148            write_f64(writer, value)?;
2149            Ok(idx + 1)
2150        }
2151        TapeNode::String(_) => {
2152            write_json_str(writer, tape.str_at(idx))?;
2153            Ok(idx + 1)
2154        }
2155        TapeNode::Array { len, .. } => {
2156            writer.write_all(b"[")?;
2157            let mut cur = idx + 1;
2158            for item_idx in 0..len {
2159                if item_idx > 0 {
2160                    writer.write_all(b",")?;
2161                }
2162                cur = write_json_tape_at(writer, tape, cur)?;
2163            }
2164            writer.write_all(b"]")?;
2165            Ok(cur)
2166        }
2167        TapeNode::Object { len, .. } => {
2168            writer.write_all(b"{")?;
2169            let mut cur = idx + 1;
2170            for field_idx in 0..len {
2171                if field_idx > 0 {
2172                    writer.write_all(b",")?;
2173                }
2174                write_json_str(writer, tape.str_at(cur))?;
2175                writer.write_all(b":")?;
2176                cur = write_json_tape_at(writer, tape, cur + 1)?;
2177            }
2178            writer.write_all(b"}")?;
2179            Ok(cur)
2180        }
2181    }
2182}
2183
2184#[cfg(feature = "simd-json")]
2185fn visit_json_tape_source_items<T, E, F>(tape: &T, source_idx: usize, mut visit: F) -> Result<(), E>
2186where
2187    T: JsonTape,
2188    F: FnMut(usize) -> Result<(), E>,
2189{
2190    use crate::data::tape::TapeNode;
2191
2192    match tape.nodes().get(source_idx).copied() {
2193        Some(TapeNode::Array { len, .. }) => {
2194            let mut cur = source_idx + 1;
2195            for _ in 0..len {
2196                visit(cur)?;
2197                cur += tape.span(cur);
2198            }
2199        }
2200        Some(_) => visit(source_idx)?,
2201        None => {}
2202    }
2203    Ok(())
2204}
2205
2206#[cfg(feature = "simd-json")]
2207fn write_json_tape_stream<W: Write, T: JsonTape>(
2208    writer: &mut W,
2209    tape: &T,
2210    plan: &NdjsonDirectStreamPlan,
2211    source_cache: &mut NdjsonPathCache,
2212    suffix_cache: &mut NdjsonPathCache,
2213    predicate_cache: &mut NdjsonPathCache,
2214    projection_caches: &mut Vec<NdjsonPathCache>,
2215) -> Result<(), JetroEngineError> {
2216    let Some(source_idx) = source_cache.index(tape, 0, &plan.source_steps) else {
2217        write_json_tape_empty_stream_result(writer, &plan.sink)?;
2218        return Ok(());
2219    };
2220
2221    match &plan.sink {
2222        NdjsonDirectStreamSink::Collect(map) => {
2223            writer.write_all(b"[")?;
2224            let mut wrote_row = false;
2225            visit_json_tape_source_items(tape, source_idx, |item_idx| {
2226                if !plan.predicate.as_ref().is_none_or(|predicate| {
2227                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2228                }) {
2229                    return Ok::<(), JetroEngineError>(());
2230                }
2231                if wrote_row {
2232                    writer.write_all(b",")?;
2233                }
2234                write_json_tape_stream_map(
2235                    writer,
2236                    tape,
2237                    item_idx,
2238                    map,
2239                    suffix_cache,
2240                    projection_caches,
2241                )?;
2242                wrote_row = true;
2243                Ok(())
2244            })?;
2245            writer.write_all(b"]")?;
2246        }
2247        NdjsonDirectStreamSink::Count => {
2248            let mut count = 0usize;
2249            let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
2250                if plan.predicate.as_ref().is_none_or(|predicate| {
2251                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2252                }) {
2253                    count += 1;
2254                }
2255                Ok(())
2256            });
2257            write_i64(writer, count as i64)?;
2258        }
2259        NdjsonDirectStreamSink::Numeric { suffix_steps, op } => {
2260            let caches = NdjsonPathCaches {
2261                source: source_cache,
2262                suffix: suffix_cache,
2263                predicate: predicate_cache,
2264            };
2265            let value = reduce_json_tape_numeric_path(
2266                tape,
2267                &plan.source_steps,
2268                plan.predicate.as_ref(),
2269                suffix_steps,
2270                *op,
2271                caches,
2272            );
2273            write_val_json(writer, &value)?;
2274        }
2275    }
2276
2277    Ok(())
2278}
2279
2280#[cfg(feature = "simd-json")]
2281fn write_json_tape_empty_stream_result<W: Write>(
2282    writer: &mut W,
2283    sink: &NdjsonDirectStreamSink,
2284) -> Result<(), JetroEngineError> {
2285    match sink {
2286        NdjsonDirectStreamSink::Collect(_) => writer.write_all(b"[]")?,
2287        NdjsonDirectStreamSink::Count => writer.write_all(b"0")?,
2288        NdjsonDirectStreamSink::Numeric { op, .. } => {
2289            let value = crate::exec::pipeline::num_finalise(
2290                *op,
2291                0,
2292                0.0,
2293                false,
2294                f64::INFINITY,
2295                f64::NEG_INFINITY,
2296                0,
2297            );
2298            write_val_json(writer, &value)?;
2299        }
2300    }
2301    Ok(())
2302}
2303
2304#[cfg(feature = "simd-json")]
2305fn write_json_tape_stream_map<W: Write, T: JsonTape>(
2306    writer: &mut W,
2307    tape: &T,
2308    item_idx: usize,
2309    map: &NdjsonDirectStreamMap,
2310    suffix_cache: &mut NdjsonPathCache,
2311    projection_caches: &mut Vec<NdjsonPathCache>,
2312) -> Result<(), JetroEngineError> {
2313    match map {
2314        NdjsonDirectStreamMap::Value(value) => {
2315            let path_idx = match value {
2316                NdjsonDirectProjectionValue::Path(steps)
2317                | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2318                    suffix_cache.index(tape, item_idx, steps)
2319                }
2320                NdjsonDirectProjectionValue::Literal(_) => None,
2321            };
2322            write_json_tape_direct_value(writer, tape, value, path_idx)?;
2323        }
2324        NdjsonDirectStreamMap::Array(items) => {
2325            write_json_tape_array_projection_from(
2326                writer,
2327                tape,
2328                item_idx,
2329                items,
2330                projection_caches,
2331            )?;
2332        }
2333        NdjsonDirectStreamMap::Object(fields) => {
2334            write_json_tape_object_projection_from(
2335                writer,
2336                tape,
2337                item_idx,
2338                fields,
2339                projection_caches,
2340            )?;
2341        }
2342    }
2343    Ok(())
2344}
2345
2346#[cfg(feature = "simd-json")]
2347fn write_json_tape_object_projection<W: Write, T: JsonTape>(
2348    writer: &mut W,
2349    tape: &T,
2350    fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2351    path_caches: &mut Vec<NdjsonPathCache>,
2352) -> Result<(), JetroEngineError> {
2353    write_json_tape_object_projection_from(writer, tape, 0, fields, path_caches)
2354}
2355
2356#[cfg(feature = "simd-json")]
2357fn write_json_tape_object_projection_from<W: Write, T: JsonTape>(
2358    writer: &mut W,
2359    tape: &T,
2360    start: usize,
2361    fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2362    path_caches: &mut Vec<NdjsonPathCache>,
2363) -> Result<(), JetroEngineError> {
2364    if path_caches.len() < fields.len() {
2365        path_caches.resize_with(fields.len(), NdjsonPathCache::default);
2366    }
2367    writer.write_all(b"{")?;
2368    let mut wrote = false;
2369    for (field_idx, field) in fields.iter().enumerate() {
2370        let path_cache = &mut path_caches[field_idx];
2371        let mut path_idx = None;
2372        match &field.value {
2373            NdjsonDirectProjectionValue::Path(steps) => {
2374                let idx = path_cache.index(tape, start, steps);
2375                path_idx = idx;
2376                if field.optional
2377                    && idx
2378                        .map(|idx| {
2379                            matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
2380                        })
2381                        .unwrap_or(true)
2382                {
2383                    continue;
2384                }
2385            }
2386            NdjsonDirectProjectionValue::ViewScalarCall {
2387                steps,
2388                call,
2389                optional,
2390            } => {
2391                let idx = path_cache.index(tape, start, steps);
2392                path_idx = idx;
2393                if (*optional || field.optional)
2394                    && idx
2395                        .map(|idx| {
2396                            matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
2397                        })
2398                        .unwrap_or(true)
2399                {
2400                    continue;
2401                }
2402                if field.optional
2403                    && idx
2404                        .map(|idx| json_tape_scalar(tape, idx))
2405                        .and_then(|value| call.try_apply_json_view(value))
2406                        .is_some_and(|value| matches!(value, Val::Null))
2407                {
2408                    continue;
2409                }
2410            }
2411            NdjsonDirectProjectionValue::Literal(Val::Null) if field.optional => {
2412                continue;
2413            }
2414            NdjsonDirectProjectionValue::Literal(_) => {}
2415        }
2416        if wrote {
2417            writer.write_all(b",")?;
2418        }
2419        write_json_str(writer, field.key.as_ref())?;
2420        writer.write_all(b":")?;
2421        write_json_tape_direct_value(writer, tape, &field.value, path_idx)?;
2422        wrote = true;
2423    }
2424    writer.write_all(b"}")?;
2425    Ok(())
2426}
2427
2428#[cfg(feature = "simd-json")]
2429fn write_json_tape_array_projection<W: Write, T: JsonTape>(
2430    writer: &mut W,
2431    tape: &T,
2432    items: &[NdjsonDirectProjectionValue],
2433    path_caches: &mut Vec<NdjsonPathCache>,
2434) -> Result<(), JetroEngineError> {
2435    write_json_tape_array_projection_from(writer, tape, 0, items, path_caches)
2436}
2437
2438#[cfg(feature = "simd-json")]
2439fn write_json_tape_array_projection_from<W: Write, T: JsonTape>(
2440    writer: &mut W,
2441    tape: &T,
2442    start: usize,
2443    items: &[NdjsonDirectProjectionValue],
2444    path_caches: &mut Vec<NdjsonPathCache>,
2445) -> Result<(), JetroEngineError> {
2446    if path_caches.len() < items.len() {
2447        path_caches.resize_with(items.len(), NdjsonPathCache::default);
2448    }
2449    writer.write_all(b"[")?;
2450    for (idx, item) in items.iter().enumerate() {
2451        if idx > 0 {
2452            writer.write_all(b",")?;
2453        }
2454        let path_idx = match item {
2455            NdjsonDirectProjectionValue::Path(steps)
2456            | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2457                path_caches[idx].index(tape, start, steps)
2458            }
2459            NdjsonDirectProjectionValue::Literal(_) => None,
2460        };
2461        write_json_tape_direct_value(writer, tape, item, path_idx)?;
2462    }
2463    writer.write_all(b"]")?;
2464    Ok(())
2465}
2466
2467#[cfg(feature = "simd-json")]
2468fn write_json_tape_direct_value<W: Write, T: JsonTape>(
2469    writer: &mut W,
2470    tape: &T,
2471    value: &NdjsonDirectProjectionValue,
2472    path_idx: Option<usize>,
2473) -> Result<(), JetroEngineError> {
2474    match value {
2475        NdjsonDirectProjectionValue::Path(_) => {
2476            if let Some(idx) = path_idx {
2477                write_json_tape_at(writer, tape, idx)?;
2478            } else {
2479                writer.write_all(b"null")?;
2480            }
2481        }
2482        NdjsonDirectProjectionValue::ViewScalarCall { call, .. } => {
2483            if let Some(idx) = path_idx {
2484                let value = json_tape_scalar(tape, idx);
2485                if let Some(value) = call.try_apply_json_view(value) {
2486                    write_val_json(writer, &value)?;
2487                } else {
2488                    write_json_tape_at(writer, tape, idx)?;
2489                }
2490            } else {
2491                writer.write_all(b"null")?;
2492            }
2493        }
2494        NdjsonDirectProjectionValue::Literal(value) => write_val_json(writer, value)?,
2495    }
2496    Ok(())
2497}
2498
2499#[cfg(feature = "simd-json")]
2500fn write_json_tape_object_items<W: Write, T: JsonTape>(
2501    writer: &mut W,
2502    tape: &T,
2503    obj_idx: Option<usize>,
2504    method: crate::builtins::BuiltinMethod,
2505) -> Result<(), JetroEngineError> {
2506    let Some(obj_idx) = obj_idx else {
2507        writer.write_all(b"[]")?;
2508        return Ok(());
2509    };
2510    let Some(crate::data::tape::TapeNode::Object { len, .. }) = tape.nodes().get(obj_idx).copied()
2511    else {
2512        writer.write_all(b"[]")?;
2513        return Ok(());
2514    };
2515
2516    writer.write_all(b"[")?;
2517    let mut cur = obj_idx + 1;
2518    for field_idx in 0..len {
2519        if field_idx > 0 {
2520            writer.write_all(b",")?;
2521        }
2522        match method {
2523            crate::builtins::BuiltinMethod::Keys => {
2524                write_json_str(writer, tape.str_at(cur))?;
2525                cur += 1;
2526                cur += tape.span(cur);
2527            }
2528            crate::builtins::BuiltinMethod::Values => {
2529                cur = write_json_tape_at(writer, tape, cur + 1)?;
2530            }
2531            crate::builtins::BuiltinMethod::Entries => {
2532                writer.write_all(b"[")?;
2533                write_json_str(writer, tape.str_at(cur))?;
2534                writer.write_all(b",")?;
2535                cur = write_json_tape_at(writer, tape, cur + 1)?;
2536                writer.write_all(b"]")?;
2537            }
2538            _ => unreachable!("non-object-items builtin"),
2539        }
2540    }
2541    writer.write_all(b"]")?;
2542    Ok(())
2543}
2544
2545#[cfg(feature = "simd-json")]
2546fn reduce_json_tape_numeric_path<T: JsonTape>(
2547    tape: &T,
2548    source_steps: &[crate::ir::physical::PhysicalPathStep],
2549    predicate: Option<&NdjsonDirectItemPredicate>,
2550    suffix_steps: &[crate::ir::physical::PhysicalPathStep],
2551    op: crate::exec::pipeline::NumOp,
2552    caches: NdjsonPathCaches<'_>,
2553) -> Val {
2554    let mut acc_i = 0i64;
2555    let mut acc_f = 0.0f64;
2556    let mut floated = false;
2557    let mut min_f = f64::INFINITY;
2558    let mut max_f = f64::NEG_INFINITY;
2559    let mut n_obs = 0usize;
2560
2561    let Some(source_idx) = caches.source.index(tape, 0, source_steps) else {
2562        return crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs);
2563    };
2564
2565    let suffix_cache = caches.suffix;
2566    let predicate_cache = caches.predicate;
2567    let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
2568        if !predicate.is_none_or(|predicate| {
2569            eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2570        }) {
2571            return Ok(());
2572        }
2573        if let Some(idx) = suffix_cache.index(tape, item_idx, suffix_steps) {
2574            fold_json_tape_numeric(
2575                json_tape_scalar(tape, idx),
2576                op,
2577                &mut acc_i,
2578                &mut acc_f,
2579                &mut floated,
2580                &mut min_f,
2581                &mut max_f,
2582                &mut n_obs,
2583            );
2584        }
2585        Ok(())
2586    });
2587
2588    crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs)
2589}
2590
2591#[cfg(feature = "simd-json")]
2592#[allow(clippy::too_many_arguments)]
2593fn fold_json_tape_numeric(
2594    value: crate::util::JsonView<'_>,
2595    op: crate::exec::pipeline::NumOp,
2596    acc_i: &mut i64,
2597    acc_f: &mut f64,
2598    floated: &mut bool,
2599    min_f: &mut f64,
2600    max_f: &mut f64,
2601    n_obs: &mut usize,
2602) {
2603    match value {
2604        crate::util::JsonView::Int(value) => crate::exec::pipeline::num_fold_i64(
2605            acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
2606        ),
2607        crate::util::JsonView::UInt(value) if value <= i64::MAX as u64 => {
2608            crate::exec::pipeline::num_fold_i64(
2609                acc_i,
2610                acc_f,
2611                floated,
2612                min_f,
2613                max_f,
2614                n_obs,
2615                op,
2616                value as i64,
2617            )
2618        }
2619        crate::util::JsonView::UInt(value) => crate::exec::pipeline::num_fold_f64(
2620            acc_i,
2621            acc_f,
2622            floated,
2623            min_f,
2624            max_f,
2625            n_obs,
2626            op,
2627            value as f64,
2628        ),
2629        crate::util::JsonView::Float(value) => crate::exec::pipeline::num_fold_f64(
2630            acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
2631        ),
2632        _ => {}
2633    }
2634}
2635
2636#[cfg(feature = "simd-json")]
2637fn eval_json_tape_item_predicate_cached<T: JsonTape>(
2638    tape: &T,
2639    item_idx: usize,
2640    predicate: &NdjsonDirectItemPredicate,
2641    cache: &mut NdjsonPathCache,
2642) -> bool {
2643    use crate::parse::ast::BinOp;
2644
2645    match predicate {
2646        NdjsonDirectItemPredicate::Path(steps) => cache
2647            .index(tape, item_idx, steps)
2648            .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
2649            .unwrap_or(false),
2650        NdjsonDirectItemPredicate::Literal(value) => crate::util::is_truthy(value),
2651        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
2652            eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
2653                && eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
2654        }
2655        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
2656            eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
2657                || eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
2658        }
2659        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } => {
2660            let Some(lhs) = eval_json_tape_item_scalar_cached(tape, item_idx, lhs, cache) else {
2661                return false;
2662            };
2663            let Some(rhs) = eval_json_tape_item_scalar_cached(tape, item_idx, rhs, cache) else {
2664                return false;
2665            };
2666            crate::util::json_cmp_binop(lhs, *op, rhs)
2667        }
2668        NdjsonDirectItemPredicate::CmpLit { lhs, op, lit } => cache
2669            .index(tape, item_idx, lhs)
2670            .map(|idx| json_tape_scalar(tape, idx))
2671            .is_some_and(|value| {
2672                crate::util::json_cmp_binop(value, *op, crate::util::JsonView::from_val(lit))
2673            }),
2674        NdjsonDirectItemPredicate::ViewScalarCall { suffix_steps, call } => cache
2675            .index(tape, item_idx, suffix_steps)
2676            .map(|idx| json_tape_scalar(tape, idx))
2677            .and_then(|value| call.try_apply_json_view(value))
2678            .is_some_and(|value| crate::util::is_truthy(&value)),
2679    }
2680}
2681
2682#[cfg(feature = "simd-json")]
2683fn eval_json_tape_item_scalar_cached<'a, T: JsonTape>(
2684    tape: &'a T,
2685    item_idx: usize,
2686    predicate: &'a NdjsonDirectItemPredicate,
2687    cache: &mut NdjsonPathCache,
2688) -> Option<crate::util::JsonView<'a>> {
2689    match predicate {
2690        NdjsonDirectItemPredicate::Path(steps) => cache
2691            .index(tape, item_idx, steps)
2692            .map(|idx| json_tape_scalar(tape, idx)),
2693        NdjsonDirectItemPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2694        _ => None,
2695    }
2696}
2697
2698fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2699where
2700    W: Write,
2701    I: IntoIterator<Item = &'a Val>,
2702{
2703    writer.write_all(b"[")?;
2704    let mut first = true;
2705    for item in items {
2706        if first {
2707            first = false;
2708        } else {
2709            writer.write_all(b",")?;
2710        }
2711        write_val_json(writer, item)?;
2712    }
2713    writer.write_all(b"]")?;
2714    Ok(())
2715}
2716
2717fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2718where
2719    W: Write,
2720    I: IntoIterator<Item = i64>,
2721{
2722    writer.write_all(b"[")?;
2723    let mut first = true;
2724    let mut buf = itoa::Buffer::new();
2725    for item in items {
2726        if first {
2727            first = false;
2728        } else {
2729            writer.write_all(b",")?;
2730        }
2731        writer.write_all(buf.format(item).as_bytes())?;
2732    }
2733    writer.write_all(b"]")?;
2734    Ok(())
2735}
2736
2737fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2738where
2739    W: Write,
2740    I: IntoIterator<Item = f64>,
2741{
2742    writer.write_all(b"[")?;
2743    let mut first = true;
2744    let mut buf = ryu::Buffer::new();
2745    for item in items {
2746        if first {
2747            first = false;
2748        } else {
2749            writer.write_all(b",")?;
2750        }
2751        if item.is_finite() {
2752            writer.write_all(buf.format(item).as_bytes())?;
2753        } else {
2754            writer.write_all(b"0")?;
2755        }
2756    }
2757    writer.write_all(b"]")?;
2758    Ok(())
2759}
2760
2761fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
2762where
2763    W: Write,
2764    I: IntoIterator<Item = &'a str>,
2765{
2766    writer.write_all(b"[")?;
2767    let mut first = true;
2768    for item in items {
2769        if first {
2770            first = false;
2771        } else {
2772            writer.write_all(b",")?;
2773        }
2774        write_json_str(writer, item)?;
2775    }
2776    writer.write_all(b"]")?;
2777    Ok(())
2778}
2779
2780fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
2781where
2782    W: Write,
2783    I: IntoIterator<Item = (&'a str, &'a Val)>,
2784{
2785    writer.write_all(b"{")?;
2786    let mut first = true;
2787    for (key, value) in entries {
2788        if first {
2789            first = false;
2790        } else {
2791            writer.write_all(b",")?;
2792        }
2793        write_json_str(writer, key)?;
2794        writer.write_all(b":")?;
2795        write_val_json(writer, value)?;
2796    }
2797    writer.write_all(b"}")?;
2798    Ok(())
2799}
2800
2801fn write_json_objvec<W: Write>(
2802    writer: &mut W,
2803    data: &crate::data::value::ObjVecData,
2804) -> Result<(), JetroEngineError> {
2805    writer.write_all(b"[")?;
2806    for row in 0..data.nrows() {
2807        if row > 0 {
2808            writer.write_all(b",")?;
2809        }
2810        writer.write_all(b"{")?;
2811        for slot in 0..data.stride() {
2812            if slot > 0 {
2813                writer.write_all(b",")?;
2814            }
2815            write_json_str(writer, data.keys[slot].as_ref())?;
2816            writer.write_all(b":")?;
2817            write_val_json(writer, data.cell(row, slot))?;
2818        }
2819        writer.write_all(b"}")?;
2820    }
2821    writer.write_all(b"]")?;
2822    Ok(())
2823}
2824
2825fn write_json_str<W: Write>(writer: &mut W, value: &str) -> Result<(), JetroEngineError> {
2826    writer.write_all(b"\"")?;
2827    let bytes = value.as_bytes();
2828    if !needs_json_escape(bytes) {
2829        writer.write_all(bytes)?;
2830        writer.write_all(b"\"")?;
2831        return Ok(());
2832    }
2833
2834    let mut start = 0usize;
2835
2836    for (idx, &byte) in bytes.iter().enumerate() {
2837        let escaped = match byte {
2838            b'"' => Some(br#"\""#.as_slice()),
2839            b'\\' => Some(br#"\\"#.as_slice()),
2840            b'\n' => Some(br#"\n"#.as_slice()),
2841            b'\r' => Some(br#"\r"#.as_slice()),
2842            b'\t' => Some(br#"\t"#.as_slice()),
2843            0x08 => Some(br#"\b"#.as_slice()),
2844            0x0c => Some(br#"\f"#.as_slice()),
2845            0x00..=0x1f => None,
2846            _ => continue,
2847        };
2848
2849        if start < idx {
2850            writer.write_all(&bytes[start..idx])?;
2851        }
2852        match escaped {
2853            Some(seq) => writer.write_all(seq)?,
2854            None => write_control_escape(writer, byte)?,
2855        }
2856        start = idx + 1;
2857    }
2858
2859    if start < bytes.len() {
2860        writer.write_all(&bytes[start..])?;
2861    }
2862    writer.write_all(b"\"")?;
2863    Ok(())
2864}
2865
2866#[inline]
2867pub(super) fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
2868    let mut buf = itoa::Buffer::new();
2869    writer.write_all(buf.format(value).as_bytes())?;
2870    Ok(())
2871}
2872
2873#[inline]
2874fn write_u64<W: Write>(writer: &mut W, value: u64) -> Result<(), JetroEngineError> {
2875    let mut buf = itoa::Buffer::new();
2876    writer.write_all(buf.format(value).as_bytes())?;
2877    Ok(())
2878}
2879
2880#[inline]
2881fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
2882    if value.is_finite() {
2883        let mut buf = ryu::Buffer::new();
2884        writer.write_all(buf.format(value).as_bytes())?;
2885    } else {
2886        writer.write_all(b"0")?;
2887    }
2888    Ok(())
2889}
2890
2891#[inline]
2892fn needs_json_escape(bytes: &[u8]) -> bool {
2893    bytes
2894        .iter()
2895        .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
2896}
2897
2898fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
2899    const HEX: &[u8; 16] = b"0123456789abcdef";
2900    writer.write_all(&[
2901        b'\\',
2902        b'u',
2903        b'0',
2904        b'0',
2905        HEX[(byte >> 4) as usize],
2906        HEX[(byte & 0x0f) as usize],
2907    ])?;
2908    Ok(())
2909}
2910
2911pub(super) fn collect_row_val(
2912    engine: &JetroEngine,
2913    document: &Jetro,
2914    plan: &crate::ir::physical::QueryPlan,
2915    line_no: u64,
2916) -> Result<Val, JetroEngineError> {
2917    engine
2918        .collect_prepared_val(document, plan)
2919        .map_err(|err| row_eval_error(line_no, err))
2920}
2921
2922pub(super) fn parse_row(
2923    engine: &JetroEngine,
2924    line_no: u64,
2925    row: Vec<u8>,
2926) -> Result<Jetro, JetroEngineError> {
2927    engine
2928        .parse_bytes_lazy(row)
2929        .map_err(|err| row_parse_error(line_no, err))
2930}
2931
2932pub(super) fn row_parse_error(line_no: u64, err: JetroEngineError) -> JetroEngineError {
2933    match err {
2934        JetroEngineError::Json(source) => RowError::InvalidJson { line_no, source }.into(),
2935        JetroEngineError::Eval(eval) => RowError::InvalidJsonMessage {
2936            line_no,
2937            message: eval.to_string(),
2938        }
2939        .into(),
2940        other => other,
2941    }
2942}
2943
2944pub(super) fn row_eval_error(line_no: u64, err: crate::EvalError) -> JetroEngineError {
2945    let message = err.0;
2946    if message.starts_with("Invalid JSON:") {
2947        RowError::InvalidJsonMessage { line_no, message }.into()
2948    } else {
2949        crate::EvalError(message).into()
2950    }
2951}
2952
2953fn trim_line_ending(buf: &mut Vec<u8>) {
2954    while matches!(buf.last(), Some(b'\n' | b'\r')) {
2955        buf.pop();
2956    }
2957}
2958
2959fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
2960    if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
2961        buf.drain(..3);
2962    }
2963}
2964
2965fn non_ws_range(buf: &[u8]) -> (usize, usize) {
2966    let start = buf
2967        .iter()
2968        .position(|b| !b.is_ascii_whitespace())
2969        .unwrap_or(buf.len());
2970    let end = buf
2971        .iter()
2972        .rposition(|b| !b.is_ascii_whitespace())
2973        .map(|idx| idx + 1)
2974        .unwrap_or(start);
2975    (start, end)
2976}
2977
2978#[cfg(test)]
2979mod tests {
2980    #[test]
2981    #[cfg(feature = "simd-json")]
2982    fn parse_row_keeps_simd_document_lazy() {
2983        let engine = crate::JetroEngine::new();
2984        let row = br#"{"name":"Ada","age":30}"#.to_vec();
2985
2986        let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
2987
2988        assert!(!document.root_val_is_materialized());
2989        assert!(!document.tape_is_built());
2990    }
2991
2992    #[test]
2993    fn owned_row_read_preserves_reusable_buffer_capacity() {
2994        let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
2995        let mut driver = super::NdjsonPerRowDriver::new(input);
2996        let mut buf = Vec::with_capacity(128);
2997
2998        let first = driver
2999            .read_next_owned(&mut buf)
3000            .expect("row read succeeds")
3001            .expect("first row exists");
3002        assert_eq!(first.1, br#"{"n":1}"#);
3003        assert_eq!(buf.capacity(), 128);
3004
3005        let second = driver
3006            .read_next_owned(&mut buf)
3007            .expect("row read succeeds")
3008            .expect("second row exists");
3009        assert_eq!(second.1, br#"{"n":2}"#);
3010        assert_eq!(buf.capacity(), 128);
3011    }
3012
3013    #[test]
3014    #[cfg(feature = "simd-json")]
3015    fn direct_tape_plan_accepts_first_suffix() {
3016        let engine = crate::JetroEngine::new();
3017        for query in [
3018            "attributes.first().value",
3019            "attributes.last().value",
3020            "attributes.nth(1).value",
3021        ] {
3022            let plan =
3023                super::direct_tape_plan(&engine, query).expect("array suffix should be direct");
3024            assert!(matches!(
3025                plan,
3026                super::NdjsonDirectTapePlan::ArrayElementPath { .. }
3027            ));
3028        }
3029    }
3030
3031    #[test]
3032    #[cfg(feature = "simd-json")]
3033    fn direct_tape_plan_accepts_rooted_bench_shapes() {
3034        let engine = crate::JetroEngine::new();
3035        for query in [
3036            "$.id",
3037            "$.a.b.c",
3038            "$.meta.id",
3039            "$.name",
3040            "$.attributes.len()",
3041            "$.store.attributes.len()",
3042            "$.attributes.map(@.key)",
3043            "$.attributes.first().value",
3044            "$.store.attributes.first().value",
3045            "$.attributes.last().value",
3046            "$.name.upper()",
3047            "$.store.name.upper()",
3048            "$.attributes.map([@.key, @.value])",
3049            r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3050            "$.keys()",
3051        ] {
3052            super::direct_tape_plan(&engine, query)
3053                .unwrap_or_else(|| panic!("{query} should have a direct NDJSON tape plan"));
3054        }
3055    }
3056
3057    #[test]
3058    #[cfg(feature = "simd-json")]
3059    fn direct_tape_plan_lowers_stream_shapes_generically() {
3060        let engine = crate::JetroEngine::new();
3061        for query in [
3062            "$.attributes.map(@.key)",
3063            "$.attributes.map(@.key.upper())",
3064            r#"$.attributes.filter(@.value.contains("_3")).map(@.key)"#,
3065            r#"$.attributes.filter(@.value.contains("_3")).map(@.key.upper())"#,
3066            r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3067            "$.attributes.map(@.weight).sum()",
3068            r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3069        ] {
3070            let plan =
3071                super::direct_tape_plan(&engine, query).expect("query should be direct NDJSON");
3072            assert!(
3073                matches!(plan, super::NdjsonDirectTapePlan::Stream(_)),
3074                "{query} should lower to a generic NDJSON stream plan"
3075            );
3076        }
3077    }
3078
3079    #[test]
3080    #[cfg(feature = "simd-json")]
3081    fn direct_byte_plan_accepts_fast_root_shapes() {
3082        let engine = crate::JetroEngine::new();
3083        for query in [
3084            "$.id",
3085            "$.name",
3086            "$.name.upper()",
3087            "$.name.lower()",
3088            "$.keys()",
3089            "$.meta.keys()",
3090            "$.values()",
3091            "$.entries()",
3092            "$.attributes.first().value",
3093            "$.store.attributes.first().value",
3094            "$.attributes.first().key.upper()",
3095            "$.attributes.last().value",
3096            "$.attributes.nth(1).value",
3097        ] {
3098            super::direct_byte_plan(&engine, query)
3099                .unwrap_or_else(|| panic!("{query} should have a direct NDJSON byte plan"));
3100        }
3101    }
3102
3103    #[test]
3104    #[cfg(feature = "simd-json")]
3105    fn direct_byte_predicates_cover_match_shapes() {
3106        let engine = crate::JetroEngine::new();
3107        let row = br#"{"active":true,"score":9910,"attributes":[{"key":"k1","value":"v_1"}]}"#;
3108        for predicate in [
3109            ("active", true),
3110            ("score > 9900", true),
3111            ("score < 100", false),
3112            (r#"attributes.first().value.contains("_1")"#, true),
3113        ] {
3114            let plan = super::direct_tape_predicate(&engine, predicate.0)
3115                .unwrap_or_else(|| panic!("{} should have a direct predicate", predicate.0));
3116            let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
3117                .expect("byte predicate should evaluate")
3118                .unwrap_or_else(|| panic!("{} should not need tape fallback", predicate.0));
3119            assert_eq!(matched, predicate.1, "{}", predicate.0);
3120        }
3121    }
3122
3123    #[test]
3124    #[cfg(feature = "simd-json")]
3125    fn direct_byte_tape_plan_counts_filtered_rows() {
3126        let engine = crate::JetroEngine::new();
3127        let query = r#"attributes.filter(@.value.contains("_3")).len()"#;
3128        let plan = super::direct_tape_plan(&engine, query).expect("filter count should be direct");
3129        assert!(super::tape_plan_can_write_byte_row(&plan));
3130
3131        let row = br#"{"attributes":[{"value":"a_3"},{"value":"b"},{"value":"c_3"}]}"#;
3132        let mut out = Vec::new();
3133        let mut scratch = Vec::new();
3134        let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
3135            .expect("byte count should write");
3136        assert!(matches!(wrote, super::BytePlanWrite::Done));
3137        assert_eq!(out, b"2");
3138    }
3139
3140    #[test]
3141    #[cfg(feature = "simd-json")]
3142    fn direct_byte_tape_plan_collects_stream_maps() {
3143        let engine = crate::JetroEngine::new();
3144        let row = br#"{"attributes":[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]}"#;
3145        for (query, expected) in [
3146            ("attributes.map(@.key)", r#"["k1","k2"]"#),
3147            (
3148                "attributes.map([@.key, @.value])",
3149                r#"[["k1","v1"],["k2","v2"]]"#,
3150            ),
3151            (
3152                "attributes.map({key: @.key, value: @.value})",
3153                r#"[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]"#,
3154            ),
3155            ("attributes.map(@.key.upper())", r#"["K1","K2"]"#),
3156            (
3157                r#"attributes.filter(@.value.contains("2")).map(@.key)"#,
3158                r#"["k2"]"#,
3159            ),
3160        ] {
3161            let plan = super::direct_tape_plan(&engine, query)
3162                .unwrap_or_else(|| panic!("{query} should be direct"));
3163            assert!(
3164                super::tape_plan_can_write_byte_row(&plan),
3165                "{query} should be byte-writable"
3166            );
3167            let mut out = Vec::new();
3168            let mut scratch = Vec::new();
3169            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
3170                .expect("byte stream should write");
3171            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
3172            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
3173        }
3174    }
3175
3176    #[test]
3177    #[cfg(feature = "simd-json")]
3178    fn direct_byte_tape_plan_writes_static_projections() {
3179        let engine = crate::JetroEngine::new();
3180        let row = br#"{"id":7,"a":{"b":{"c":1}}}"#;
3181        for (query, expected) in [
3182            ("$.a.b.c", "1"),
3183            (r#"{test: $.a.b.c, b: $.a.b}"#, r#"{"test":1,"b":{"c":1}}"#),
3184            (r#"[$.a.b.c, $.id]"#, r#"[1,7]"#),
3185        ] {
3186            let plan = super::direct_tape_plan(&engine, query)
3187                .unwrap_or_else(|| panic!("{query} should be direct"));
3188            assert!(
3189                super::tape_plan_can_write_byte_row(&plan),
3190                "{query} should be byte-writable"
3191            );
3192            let mut out = Vec::new();
3193            let mut scratch = Vec::new();
3194            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
3195                .expect("byte projection should write");
3196            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
3197            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
3198        }
3199    }
3200
3201    #[test]
3202    #[cfg(feature = "simd-json")]
3203    fn run_ndjson_uses_byte_paths_for_nested_object_items() {
3204        let engine = crate::JetroEngine::new();
3205        let rows = std::io::Cursor::new(
3206            br#"{"id":1}
3207{"id":2}
3208"#,
3209        );
3210        let mut out = Vec::new();
3211        engine
3212            .run_ndjson(rows, "$.id", &mut out)
3213            .expect("rooted byte path should run");
3214        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
3215
3216        let rows = std::io::Cursor::new(
3217            br#"{"meta":{"id":1,"kind":"a"}}
3218{"meta":{"id":2,"kind":"b"}}
3219"#,
3220        );
3221
3222        let mut out = Vec::new();
3223        engine
3224            .run_ndjson(rows, "$.meta.id", &mut out)
3225            .expect("nested byte path should run");
3226        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
3227
3228        let rows = std::io::Cursor::new(br#"{"meta":{"id":1,"kind":"a"}}"#);
3229        let mut out = Vec::new();
3230        engine
3231            .run_ndjson(rows, "$.meta.keys()", &mut out)
3232            .expect("nested byte object items should run");
3233        assert_eq!(std::str::from_utf8(&out).unwrap(), "[\"id\",\"kind\"]\n");
3234    }
3235
3236    #[test]
3237    #[cfg(feature = "simd-json")]
3238    fn run_ndjson_uses_byte_paths_for_nested_array_demands() {
3239        let engine = crate::JetroEngine::new();
3240        let rows = std::io::Cursor::new(
3241            br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}]}}
3242{"store":{"attributes":[{"value":"c"},{"value":"d"}]}}
3243"#,
3244        );
3245
3246        let mut out = Vec::new();
3247        engine
3248            .run_ndjson(rows, "$.store.attributes.first().value", &mut out)
3249            .expect("nested byte array demand should run");
3250        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"a\"\n\"c\"\n");
3251    }
3252}