Skip to main content

jetro_core/io/
ndjson.rs

1use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
2use super::stream_exec::CompiledRowStream;
3use super::stream_plan::{
4    lower_root_rows_query, RowStreamDirection, RowStreamPlan, RowStreamSourceKind,
5};
6use super::stream_subquery::{lower_single_rows_subquery, RowStreamSubqueryPlan, STREAM_BINDING};
7use super::stream_types::{RowStreamRowResult, RowStreamStats};
8use super::{NdjsonSource, RowError};
9use crate::compile::compiler::Compiler;
10use crate::data::context::Env;
11use crate::data::value::Val;
12use crate::plan::physical::PlanningContext;
13use crate::util::is_truthy;
14use crate::{EvalError, Jetro, JetroEngine, JetroEngineError, VM};
15use memchr::memchr;
16use serde_json::Value;
17use std::fs::File;
18use std::io::{BufRead, BufWriter, Write};
19use std::path::Path;
20use std::sync::MutexGuard;
21
22#[cfg(feature = "simd-json")]
23use super::ndjson_byte::{
24    eval_ndjson_byte_predicate_row, tape_plan_can_write_byte_row, write_ndjson_byte_plan_row,
25    write_ndjson_byte_tape_plan_row, write_ndjson_hinted_tape_plan_row, BytePlanWrite,
26};
27#[cfg(test)]
28#[cfg(feature = "simd-json")]
29pub(super) use super::ndjson_direct::{
30    direct_byte_plan, direct_writer_plan_kind, NdjsonDirectPlanKind,
31};
32#[cfg(feature = "simd-json")]
33pub(super) use super::ndjson_direct::{
34    direct_tape_plan, direct_tape_predicate, direct_writer_plans, NdjsonDirectBytePlan,
35    NdjsonDirectElement, NdjsonDirectItemPredicate, NdjsonDirectPredicate,
36    NdjsonDirectProjectionValue, NdjsonDirectStreamMap, NdjsonDirectStreamPlan,
37    NdjsonDirectStreamSink, NdjsonDirectTapePlan,
38};
39#[cfg(feature = "simd-json")]
40use super::ndjson_hint::{
41    NdjsonHintAccessPlan, NdjsonHintConfig, NdjsonHintDecision, NdjsonHintState,
42};
43#[cfg(feature = "simd-json")]
44use super::ndjson_stream_cache::NdjsonConstantStreamCache;
45
46const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
47const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
48const DEFAULT_READER_BUFFER_CAPACITY: usize = 1024 * 1024;
49pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
50
51#[cfg(test)]
52#[cfg(feature = "simd-json")]
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub(super) enum NdjsonWriterPathKind {
55    ByteExpr,
56    ByteWritableTape,
57    Tape,
58}
59
60#[cfg(test)]
61#[cfg(feature = "simd-json")]
62pub(super) fn direct_writer_path_kind(
63    engine: &JetroEngine,
64    query: &str,
65) -> Option<NdjsonWriterPathKind> {
66    let (byte, tape) = direct_writer_plans(engine, query)?;
67    if byte.is_some() {
68        return Some(NdjsonWriterPathKind::ByteExpr);
69    }
70    if tape_plan_can_write_byte_row(&tape) {
71        return Some(NdjsonWriterPathKind::ByteWritableTape);
72    }
73    Some(NdjsonWriterPathKind::Tape)
74}
75
76/// Configuration for per-row NDJSON execution.
77#[derive(Clone, Copy, Debug, PartialEq, Eq)]
78pub struct NdjsonOptions {
79    pub max_line_len: usize,
80    pub initial_buffer_capacity: usize,
81    pub reader_buffer_capacity: usize,
82    pub reverse_chunk_size: usize,
83    pub parallel_min_bytes: u64,
84    pub parallelism: NdjsonParallelism,
85    pub row_frame: NdjsonRowFrame,
86    pub null_output: NdjsonNullOutput,
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum NdjsonParallelism {
91    Auto,
92    Off,
93}
94
95#[derive(Clone, Copy, Debug, Eq, PartialEq)]
96pub enum NdjsonNullOutput {
97    Skip,
98    Emit,
99}
100
101impl Default for NdjsonOptions {
102    fn default() -> Self {
103        Self {
104            max_line_len: DEFAULT_MAX_LINE_LEN,
105            initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
106            reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
107            reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
108            parallel_min_bytes: 64 * 1024 * 1024,
109            parallelism: NdjsonParallelism::Auto,
110            row_frame: NdjsonRowFrame::JsonLine,
111            null_output: NdjsonNullOutput::Skip,
112        }
113    }
114}
115
116impl NdjsonOptions {
117    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
118        self.max_line_len = max_line_len;
119        self
120    }
121
122    pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
123        self.initial_buffer_capacity = capacity;
124        self
125    }
126
127    pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
128        self.reader_buffer_capacity = capacity;
129        self
130    }
131
132    pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
133        self.reverse_chunk_size = capacity;
134        self
135    }
136
137    pub fn with_parallel_min_bytes(mut self, bytes: u64) -> Self {
138        self.parallel_min_bytes = bytes;
139        self
140    }
141
142    pub fn with_parallelism(mut self, parallelism: NdjsonParallelism) -> Self {
143        self.parallelism = parallelism;
144        self
145    }
146
147    pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
148        self.row_frame = row_frame;
149        self
150    }
151
152    pub fn with_null_output(mut self, null_output: NdjsonNullOutput) -> Self {
153        self.null_output = null_output;
154        self
155    }
156}
157
158/// Forward-only per-row NDJSON reader.
159pub struct NdjsonPerRowDriver<R> {
160    reader: R,
161    line_no: u64,
162    max_line_len: usize,
163    row_frame: NdjsonRowFrame,
164}
165
166impl<R: BufRead> NdjsonPerRowDriver<R> {
167    pub fn new(reader: R) -> Self {
168        Self {
169            reader,
170            line_no: 0,
171            max_line_len: DEFAULT_MAX_LINE_LEN,
172            row_frame: NdjsonRowFrame::JsonLine,
173        }
174    }
175
176    pub fn with_options(mut self, options: NdjsonOptions) -> Self {
177        self.max_line_len = options.max_line_len;
178        self.row_frame = options.row_frame;
179        self
180    }
181
182    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
183        self.max_line_len = max_line_len;
184        self
185    }
186
187    pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
188        self.row_frame = row_frame;
189        self
190    }
191
192    pub fn line_no(&self) -> u64 {
193        self.line_no
194    }
195
196    /// Read the next non-empty NDJSON row into `buf`, returning its 1-based line
197    /// number. Empty and whitespace-only rows are skipped.
198    pub fn read_next_nonempty<'a>(
199        &mut self,
200        buf: &'a mut Vec<u8>,
201    ) -> Result<Option<(u64, &'a [u8])>, RowError> {
202        loop {
203            buf.clear();
204            let read = self.read_physical_line(buf)?;
205            if read == 0 {
206                return Ok(None);
207            }
208            self.line_no += 1;
209
210            strip_initial_bom(self.line_no, buf);
211            trim_line_ending(buf);
212
213            let (start, end) = non_ws_range(buf);
214            if start == end {
215                continue;
216            }
217
218            let len = end - start;
219            if len > self.max_line_len {
220                return Err(RowError::LineTooLarge {
221                    line_no: self.line_no,
222                    len,
223                    max: self.max_line_len,
224                });
225            }
226
227            match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
228                FramePayload::Data(range) => {
229                    return Ok(Some((
230                        self.line_no,
231                        &buf[start + range.start..start + range.end],
232                    )));
233                }
234                FramePayload::Skip => continue,
235            }
236        }
237    }
238
239    /// Read the next non-empty row and transfer ownership of `buf` to the
240    /// caller. This is the hot path used by `JetroEngine` NDJSON execution so
241    /// the row can be parsed without an extra bytes copy.
242    pub fn read_next_owned(
243        &mut self,
244        buf: &mut Vec<u8>,
245    ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
246        loop {
247            buf.clear();
248            let read = self.read_physical_line(buf)?;
249            if read == 0 {
250                return Ok(None);
251            }
252            self.line_no += 1;
253
254            strip_initial_bom(self.line_no, buf);
255            trim_line_ending(buf);
256
257            let (start, end) = non_ws_range(buf);
258            if start == end {
259                continue;
260            }
261
262            let len = end - start;
263            if len > self.max_line_len {
264                return Err(RowError::LineTooLarge {
265                    line_no: self.line_no,
266                    len,
267                    max: self.max_line_len,
268                });
269            }
270
271            let payload = match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
272                FramePayload::Data(range) => start + range.start..start + range.end,
273                FramePayload::Skip => continue,
274            };
275            if payload.start > 0 || payload.end < buf.len() {
276                buf.copy_within(payload.clone(), 0);
277                buf.truncate(payload.end - payload.start);
278            }
279
280            let capacity = buf.capacity();
281            return Ok(Some((
282                self.line_no,
283                std::mem::replace(buf, Vec::with_capacity(capacity)),
284            )));
285        }
286    }
287
288    fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
289        loop {
290            let available = self.reader.fill_buf()?;
291            if available.is_empty() {
292                return Ok(buf.len());
293            }
294
295            if let Some(pos) = memchr(b'\n', available) {
296                buf.extend_from_slice(&available[..=pos]);
297                self.reader.consume(pos + 1);
298                self.check_physical_line_len(buf.len())?;
299                return Ok(buf.len());
300            }
301
302            let len = available.len();
303            buf.extend_from_slice(available);
304            self.reader.consume(len);
305            self.check_physical_line_len(buf.len())?;
306        }
307    }
308
309    fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
310        let hard_max = self.max_line_len.saturating_add(2);
311        if len > hard_max {
312            return Err(RowError::LineTooLarge {
313                line_no: self.line_no + 1,
314                len,
315                max: self.max_line_len,
316            });
317        }
318        Ok(())
319    }
320}
321
322#[derive(Clone, Copy, Debug, Eq, PartialEq)]
323pub enum NdjsonControl {
324    Continue,
325    Stop,
326}
327
328pub fn for_each_ndjson<R, F>(
329    engine: &JetroEngine,
330    reader: R,
331    query: &str,
332    f: F,
333) -> Result<usize, JetroEngineError>
334where
335    R: BufRead,
336    F: FnMut(Value),
337{
338    for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
339}
340
341pub fn for_each_ndjson_with_options<R, F>(
342    engine: &JetroEngine,
343    reader: R,
344    query: &str,
345    options: NdjsonOptions,
346    mut f: F,
347) -> Result<usize, JetroEngineError>
348where
349    R: BufRead,
350    F: FnMut(Value),
351{
352    drive_ndjson(engine, reader, query, options, |value| {
353        f(value);
354        Ok(NdjsonControl::Continue)
355    })
356}
357
358pub fn for_each_ndjson_until<R, F>(
359    engine: &JetroEngine,
360    reader: R,
361    query: &str,
362    f: F,
363) -> Result<usize, JetroEngineError>
364where
365    R: BufRead,
366    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
367{
368    for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
369}
370
371pub fn for_each_ndjson_until_with_options<R, F>(
372    engine: &JetroEngine,
373    reader: R,
374    query: &str,
375    options: NdjsonOptions,
376    f: F,
377) -> Result<usize, JetroEngineError>
378where
379    R: BufRead,
380    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
381{
382    drive_ndjson(engine, reader, query, options, f)
383}
384
385pub fn for_each_ndjson_source<F>(
386    engine: &JetroEngine,
387    source: NdjsonSource,
388    query: &str,
389    f: F,
390) -> Result<usize, JetroEngineError>
391where
392    F: FnMut(Value),
393{
394    for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
395}
396
397pub fn for_each_ndjson_source_with_options<F>(
398    engine: &JetroEngine,
399    source: NdjsonSource,
400    query: &str,
401    options: NdjsonOptions,
402    f: F,
403) -> Result<usize, JetroEngineError>
404where
405    F: FnMut(Value),
406{
407    match source {
408        NdjsonSource::File(path) => {
409            let file = File::open(path)?;
410            for_each_ndjson_with_options(
411                engine,
412                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
413                query,
414                options,
415                f,
416            )
417        }
418        NdjsonSource::Reader(reader) => {
419            for_each_ndjson_with_options(engine, reader, query, options, f)
420        }
421    }
422}
423
424pub fn for_each_ndjson_source_until<F>(
425    engine: &JetroEngine,
426    source: NdjsonSource,
427    query: &str,
428    f: F,
429) -> Result<usize, JetroEngineError>
430where
431    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
432{
433    for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
434}
435
436pub fn for_each_ndjson_source_until_with_options<F>(
437    engine: &JetroEngine,
438    source: NdjsonSource,
439    query: &str,
440    options: NdjsonOptions,
441    f: F,
442) -> Result<usize, JetroEngineError>
443where
444    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
445{
446    match source {
447        NdjsonSource::File(path) => {
448            let file = File::open(path)?;
449            for_each_ndjson_until_with_options(
450                engine,
451                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
452                query,
453                options,
454                f,
455            )
456        }
457        NdjsonSource::Reader(reader) => {
458            for_each_ndjson_until_with_options(engine, reader, query, options, f)
459        }
460    }
461}
462
463pub fn collect_ndjson<R>(
464    engine: &JetroEngine,
465    reader: R,
466    query: &str,
467) -> Result<Vec<Value>, JetroEngineError>
468where
469    R: BufRead,
470{
471    collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
472}
473
474pub fn collect_ndjson_with_options<R>(
475    engine: &JetroEngine,
476    reader: R,
477    query: &str,
478    options: NdjsonOptions,
479) -> Result<Vec<Value>, JetroEngineError>
480where
481    R: BufRead,
482{
483    let mut values = Vec::new();
484    for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
485    Ok(values)
486}
487
488pub fn collect_ndjson_file<P>(
489    engine: &JetroEngine,
490    path: P,
491    query: &str,
492) -> Result<Vec<Value>, JetroEngineError>
493where
494    P: AsRef<Path>,
495{
496    let file = File::open(path)?;
497    let options = NdjsonOptions::default();
498    collect_ndjson_with_options(
499        engine,
500        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
501        query,
502        options,
503    )
504}
505
506pub fn collect_ndjson_file_with_options<P>(
507    engine: &JetroEngine,
508    path: P,
509    query: &str,
510    options: NdjsonOptions,
511) -> Result<Vec<Value>, JetroEngineError>
512where
513    P: AsRef<Path>,
514{
515    let file = File::open(path)?;
516    collect_ndjson_with_options(
517        engine,
518        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
519        query,
520        options,
521    )
522}
523
524pub fn collect_ndjson_source(
525    engine: &JetroEngine,
526    source: NdjsonSource,
527    query: &str,
528) -> Result<Vec<Value>, JetroEngineError> {
529    collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
530}
531
532pub fn collect_ndjson_source_with_options(
533    engine: &JetroEngine,
534    source: NdjsonSource,
535    query: &str,
536    options: NdjsonOptions,
537) -> Result<Vec<Value>, JetroEngineError> {
538    match source {
539        NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
540        NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
541    }
542}
543
544pub fn collect_ndjson_matches<R>(
545    engine: &JetroEngine,
546    reader: R,
547    predicate: &str,
548    limit: usize,
549) -> Result<Vec<Value>, JetroEngineError>
550where
551    R: BufRead,
552{
553    collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
554}
555
556pub fn collect_ndjson_matches_with_options<R>(
557    engine: &JetroEngine,
558    reader: R,
559    predicate: &str,
560    limit: usize,
561    options: NdjsonOptions,
562) -> Result<Vec<Value>, JetroEngineError>
563where
564    R: BufRead,
565{
566    let mut values = Vec::with_capacity(limit);
567    drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
568        values.push(Value::from(value));
569        Ok(NdjsonControl::Continue)
570    })?;
571    Ok(values)
572}
573
574pub fn collect_ndjson_matches_file<P>(
575    engine: &JetroEngine,
576    path: P,
577    predicate: &str,
578    limit: usize,
579) -> Result<Vec<Value>, JetroEngineError>
580where
581    P: AsRef<Path>,
582{
583    let file = File::open(path)?;
584    let options = NdjsonOptions::default();
585    collect_ndjson_matches_with_options(
586        engine,
587        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
588        predicate,
589        limit,
590        options,
591    )
592}
593
594pub fn collect_ndjson_matches_file_with_options<P>(
595    engine: &JetroEngine,
596    path: P,
597    predicate: &str,
598    limit: usize,
599    options: NdjsonOptions,
600) -> Result<Vec<Value>, JetroEngineError>
601where
602    P: AsRef<Path>,
603{
604    let file = File::open(path)?;
605    collect_ndjson_matches_with_options(
606        engine,
607        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
608        predicate,
609        limit,
610        options,
611    )
612}
613
614pub fn collect_ndjson_matches_source(
615    engine: &JetroEngine,
616    source: NdjsonSource,
617    predicate: &str,
618    limit: usize,
619) -> Result<Vec<Value>, JetroEngineError> {
620    collect_ndjson_matches_source_with_options(
621        engine,
622        source,
623        predicate,
624        limit,
625        NdjsonOptions::default(),
626    )
627}
628
629pub fn collect_ndjson_matches_source_with_options(
630    engine: &JetroEngine,
631    source: NdjsonSource,
632    predicate: &str,
633    limit: usize,
634    options: NdjsonOptions,
635) -> Result<Vec<Value>, JetroEngineError> {
636    match source {
637        NdjsonSource::File(path) => {
638            collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
639        }
640        NdjsonSource::Reader(reader) => {
641            collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
642        }
643    }
644}
645
646pub fn run_ndjson<R, W>(
647    engine: &JetroEngine,
648    reader: R,
649    query: &str,
650    writer: W,
651) -> Result<usize, JetroEngineError>
652where
653    R: BufRead,
654    W: Write,
655{
656    run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
657}
658
659pub fn run_ndjson_file<P, W>(
660    engine: &JetroEngine,
661    path: P,
662    query: &str,
663    writer: W,
664) -> Result<usize, JetroEngineError>
665where
666    P: AsRef<Path>,
667    W: Write,
668{
669    let options = NdjsonOptions::default();
670    run_ndjson_file_with_options(engine, path, query, writer, options)
671}
672
673pub fn run_ndjson_file_with_options<P, W>(
674    engine: &JetroEngine,
675    path: P,
676    query: &str,
677    writer: W,
678    options: NdjsonOptions,
679) -> Result<usize, JetroEngineError>
680where
681    P: AsRef<Path>,
682    W: Write,
683{
684    if let Some(plan) = ndjson_rows_stream_plan(query)? {
685        return drive_ndjson_rows_stream_file(engine, path, &plan, None, options, writer);
686    }
687    if let Some(plan) = ndjson_rows_subquery_plan(query)? {
688        return drive_ndjson_rows_subquery_file(engine, path, &plan, options, writer);
689    }
690
691    let file = File::open(path)?;
692    run_ndjson_with_options(
693        engine,
694        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
695        query,
696        writer,
697        options,
698    )
699}
700
701pub fn run_ndjson_with_options<R, W>(
702    engine: &JetroEngine,
703    reader: R,
704    query: &str,
705    writer: W,
706    options: NdjsonOptions,
707) -> Result<usize, JetroEngineError>
708where
709    R: BufRead,
710    W: Write,
711{
712    if let Some(plan) = ndjson_rows_stream_plan(query)? {
713        return drive_ndjson_rows_stream_reader(engine, reader, &plan, None, options, writer);
714    }
715    if ndjson_rows_subquery_plan(query)?.is_some() {
716        return Err(JetroEngineError::Eval(EvalError(
717            "$.rows() stream subqueries require a file-backed NDJSON source".into(),
718        )));
719    }
720
721    drive_ndjson_writer(engine, reader, query, None, options, writer)
722}
723
724pub fn run_ndjson_limit<R, W>(
725    engine: &JetroEngine,
726    reader: R,
727    query: &str,
728    limit: usize,
729    writer: W,
730) -> Result<usize, JetroEngineError>
731where
732    R: BufRead,
733    W: Write,
734{
735    run_ndjson_limit_with_options(
736        engine,
737        reader,
738        query,
739        limit,
740        writer,
741        NdjsonOptions::default(),
742    )
743}
744
745pub fn run_ndjson_limit_with_options<R, W>(
746    engine: &JetroEngine,
747    reader: R,
748    query: &str,
749    limit: usize,
750    writer: W,
751    options: NdjsonOptions,
752) -> Result<usize, JetroEngineError>
753where
754    R: BufRead,
755    W: Write,
756{
757    if limit == 0 {
758        return Ok(0);
759    }
760
761    if let Some(plan) = ndjson_rows_stream_plan(query)? {
762        return drive_ndjson_rows_stream_reader(
763            engine,
764            reader,
765            &plan,
766            Some(limit),
767            options,
768            writer,
769        );
770    }
771
772    drive_ndjson_writer(engine, reader, query, Some(limit), options, writer)
773}
774
775pub fn run_ndjson_file_limit<P, W>(
776    engine: &JetroEngine,
777    path: P,
778    query: &str,
779    limit: usize,
780    writer: W,
781) -> Result<usize, JetroEngineError>
782where
783    P: AsRef<Path>,
784    W: Write,
785{
786    let options = NdjsonOptions::default();
787    run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
788}
789
790pub fn run_ndjson_file_limit_with_options<P, W>(
791    engine: &JetroEngine,
792    path: P,
793    query: &str,
794    limit: usize,
795    writer: W,
796    options: NdjsonOptions,
797) -> Result<usize, JetroEngineError>
798where
799    P: AsRef<Path>,
800    W: Write,
801{
802    if limit == 0 {
803        return Ok(0);
804    }
805    if let Some(plan) = ndjson_rows_stream_plan(query)? {
806        return drive_ndjson_rows_stream_file(engine, path, &plan, Some(limit), options, writer);
807    }
808    if let Some(plan) = ndjson_rows_subquery_plan(query)? {
809        return drive_ndjson_rows_subquery_file(engine, path, &plan, options, writer);
810    }
811
812    let file = File::open(path)?;
813    run_ndjson_limit_with_options(
814        engine,
815        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
816        query,
817        limit,
818        writer,
819        options,
820    )
821}
822
823pub fn run_ndjson_source<W>(
824    engine: &JetroEngine,
825    source: NdjsonSource,
826    query: &str,
827    writer: W,
828) -> Result<usize, JetroEngineError>
829where
830    W: Write,
831{
832    run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
833}
834
835pub fn run_ndjson_source_with_options<W>(
836    engine: &JetroEngine,
837    source: NdjsonSource,
838    query: &str,
839    writer: W,
840    options: NdjsonOptions,
841) -> Result<usize, JetroEngineError>
842where
843    W: Write,
844{
845    match source {
846        NdjsonSource::File(path) => {
847            run_ndjson_file_with_options(engine, path, query, writer, options)
848        }
849        NdjsonSource::Reader(reader) => {
850            run_ndjson_with_options(engine, reader, query, writer, options)
851        }
852    }
853}
854
855pub fn run_ndjson_source_limit<W>(
856    engine: &JetroEngine,
857    source: NdjsonSource,
858    query: &str,
859    limit: usize,
860    writer: W,
861) -> Result<usize, JetroEngineError>
862where
863    W: Write,
864{
865    run_ndjson_source_limit_with_options(
866        engine,
867        source,
868        query,
869        limit,
870        writer,
871        NdjsonOptions::default(),
872    )
873}
874
875pub fn run_ndjson_source_limit_with_options<W>(
876    engine: &JetroEngine,
877    source: NdjsonSource,
878    query: &str,
879    limit: usize,
880    writer: W,
881    options: NdjsonOptions,
882) -> Result<usize, JetroEngineError>
883where
884    W: Write,
885{
886    match source {
887        NdjsonSource::File(path) => {
888            run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
889        }
890        NdjsonSource::Reader(reader) => {
891            run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
892        }
893    }
894}
895
896pub fn run_ndjson_matches<R, W>(
897    engine: &JetroEngine,
898    reader: R,
899    predicate: &str,
900    limit: usize,
901    writer: W,
902) -> Result<usize, JetroEngineError>
903where
904    R: BufRead,
905    W: Write,
906{
907    run_ndjson_matches_with_options(
908        engine,
909        reader,
910        predicate,
911        limit,
912        writer,
913        NdjsonOptions::default(),
914    )
915}
916
917pub fn run_ndjson_matches_with_options<R, W>(
918    engine: &JetroEngine,
919    reader: R,
920    predicate: &str,
921    limit: usize,
922    writer: W,
923    options: NdjsonOptions,
924) -> Result<usize, JetroEngineError>
925where
926    R: BufRead,
927    W: Write,
928{
929    drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
930}
931
932pub fn run_ndjson_matches_file<P, W>(
933    engine: &JetroEngine,
934    path: P,
935    predicate: &str,
936    limit: usize,
937    writer: W,
938) -> Result<usize, JetroEngineError>
939where
940    P: AsRef<Path>,
941    W: Write,
942{
943    let file = File::open(path)?;
944    let options = NdjsonOptions::default();
945    run_ndjson_matches_with_options(
946        engine,
947        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
948        predicate,
949        limit,
950        writer,
951        options,
952    )
953}
954
955pub fn run_ndjson_matches_file_with_options<P, W>(
956    engine: &JetroEngine,
957    path: P,
958    predicate: &str,
959    limit: usize,
960    writer: W,
961    options: NdjsonOptions,
962) -> Result<usize, JetroEngineError>
963where
964    P: AsRef<Path>,
965    W: Write,
966{
967    let file = File::open(path)?;
968    run_ndjson_matches_with_options(
969        engine,
970        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
971        predicate,
972        limit,
973        writer,
974        options,
975    )
976}
977
978pub fn run_ndjson_matches_source<W>(
979    engine: &JetroEngine,
980    source: NdjsonSource,
981    predicate: &str,
982    limit: usize,
983    writer: W,
984) -> Result<usize, JetroEngineError>
985where
986    W: Write,
987{
988    run_ndjson_matches_source_with_options(
989        engine,
990        source,
991        predicate,
992        limit,
993        writer,
994        NdjsonOptions::default(),
995    )
996}
997
998pub fn run_ndjson_matches_source_with_options<W>(
999    engine: &JetroEngine,
1000    source: NdjsonSource,
1001    predicate: &str,
1002    limit: usize,
1003    writer: W,
1004    options: NdjsonOptions,
1005) -> Result<usize, JetroEngineError>
1006where
1007    W: Write,
1008{
1009    match source {
1010        NdjsonSource::File(path) => {
1011            run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
1012        }
1013        NdjsonSource::Reader(reader) => {
1014            run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
1015        }
1016    }
1017}
1018
1019fn drive_ndjson<R, F>(
1020    engine: &JetroEngine,
1021    reader: R,
1022    query: &str,
1023    options: NdjsonOptions,
1024    mut emit: F,
1025) -> Result<usize, JetroEngineError>
1026where
1027    R: BufRead,
1028    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
1029{
1030    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1031    let plan = engine.cached_plan(query, PlanningContext::bytes());
1032    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1033    let mut count = 0;
1034
1035    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1036        let document = parse_row(engine, line_no, row)?;
1037        let out = collect_row_val(engine, &document, &plan, line_no)?;
1038        count += 1;
1039        if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
1040            break;
1041        }
1042    }
1043
1044    Ok(count)
1045}
1046
1047fn ndjson_rows_stream_plan(query: &str) -> Result<Option<RowStreamPlan>, JetroEngineError> {
1048    lower_root_rows_query(query, RowStreamSourceKind::NdjsonRows)
1049        .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
1050}
1051
1052fn ndjson_rows_subquery_plan(
1053    query: &str,
1054) -> Result<Option<RowStreamSubqueryPlan>, JetroEngineError> {
1055    if !query.contains("$.rows") {
1056        return Ok(None);
1057    }
1058    let expr = crate::parse::parser::parse(query)
1059        .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))?;
1060    lower_single_rows_subquery(&expr, RowStreamSourceKind::NdjsonRows)
1061        .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
1062}
1063
1064fn drive_ndjson_rows_subquery_file<P, W>(
1065    engine: &JetroEngine,
1066    path: P,
1067    plan: &RowStreamSubqueryPlan,
1068    options: NdjsonOptions,
1069    writer: W,
1070) -> Result<usize, JetroEngineError>
1071where
1072    P: AsRef<Path>,
1073    W: Write,
1074{
1075    let stream_value = collect_ndjson_rows_stream_file(engine, path, &plan.stream, options)?;
1076    let wrapper = Compiler::compile(&plan.wrapper, "<ndjson-rows-wrapper>");
1077    let env = Env::new(Val::Null).with_var(STREAM_BINDING, stream_value);
1078    let value = engine
1079        .lock_vm()
1080        .exec_in_env(&wrapper, &env)
1081        .map_err(JetroEngineError::Eval)?;
1082    let mut writer = ndjson_writer_with_options(writer, options);
1083    let emitted = write_val_line_with_options(&mut writer, &value, options)? as usize;
1084    writer.flush()?;
1085    Ok(emitted)
1086}
1087
1088fn collect_ndjson_rows_stream_file<P>(
1089    engine: &JetroEngine,
1090    path: P,
1091    plan: &RowStreamPlan,
1092    options: NdjsonOptions,
1093) -> Result<Val, JetroEngineError>
1094where
1095    P: AsRef<Path>,
1096{
1097    if let Some(value) =
1098        super::ndjson_parallel::collect_rows_stream_file(engine, path.as_ref(), plan, options)?
1099    {
1100        return Ok(value);
1101    }
1102
1103    let mut executor = CompiledRowStream::new(plan);
1104    let mut out = Vec::new();
1105
1106    if plan.direction == RowStreamDirection::Forward {
1107        let file = File::open(path)?;
1108        let mut driver = NdjsonPerRowDriver::new(std::io::BufReader::with_capacity(
1109            options.reader_buffer_capacity,
1110            file,
1111        ))
1112        .with_options(options);
1113        let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1114        while !executor.is_exhausted() {
1115            let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1116                break;
1117            };
1118            if collect_row_stream_result(
1119                engine,
1120                line_no,
1121                executor.apply_owned_row(engine, line_no, row)?,
1122                &mut out,
1123            )? || executor.is_exhausted()
1124            {
1125                break;
1126            }
1127        }
1128    } else {
1129        let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1130        while !executor.is_exhausted() {
1131            let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1132                break;
1133            };
1134            if collect_row_stream_result(
1135                engine,
1136                line_no,
1137                executor.apply_owned_row(engine, line_no, row)?,
1138                &mut out,
1139            )? || executor.is_exhausted()
1140            {
1141                break;
1142            }
1143        }
1144    }
1145
1146    if plan.demand.retained_limit == Some(1) {
1147        Ok(out.into_iter().next().unwrap_or(Val::Null))
1148    } else {
1149        Ok(Val::Arr(std::sync::Arc::new(out)))
1150    }
1151}
1152
1153fn collect_row_stream_result(
1154    engine: &JetroEngine,
1155    line_no: u64,
1156    result: RowStreamRowResult,
1157    out: &mut Vec<Val>,
1158) -> Result<bool, JetroEngineError> {
1159    match result {
1160        RowStreamRowResult::Emit(value) => out.push(value),
1161        RowStreamRowResult::EmitBytes(bytes) => {
1162            let document = parse_row(engine, line_no, bytes)?;
1163            let value = document
1164                .root_val_with(engine.keys())
1165                .map_err(|err| row_eval_error(line_no, err))?;
1166            out.push(value);
1167        }
1168        RowStreamRowResult::Skip => {}
1169        RowStreamRowResult::Stop => return Ok(true),
1170    }
1171    Ok(false)
1172}
1173
1174fn drive_ndjson_rows_stream_reader<R, W>(
1175    engine: &JetroEngine,
1176    reader: R,
1177    plan: &RowStreamPlan,
1178    external_limit: Option<usize>,
1179    options: NdjsonOptions,
1180    writer: W,
1181) -> Result<usize, JetroEngineError>
1182where
1183    R: BufRead,
1184    W: Write,
1185{
1186    let (emitted, _) = drive_ndjson_rows_stream_reader_with_stats(
1187        engine,
1188        reader,
1189        plan,
1190        external_limit,
1191        options,
1192        writer,
1193    )?;
1194    Ok(emitted)
1195}
1196
1197fn drive_ndjson_rows_stream_reader_with_stats<R, W>(
1198    engine: &JetroEngine,
1199    reader: R,
1200    plan: &RowStreamPlan,
1201    external_limit: Option<usize>,
1202    options: NdjsonOptions,
1203    writer: W,
1204) -> Result<(usize, RowStreamStats), JetroEngineError>
1205where
1206    R: BufRead,
1207    W: Write,
1208{
1209    if plan.direction == RowStreamDirection::Reverse {
1210        return Err(JetroEngineError::Eval(EvalError(
1211            "$.rows().reverse() requires a file-backed NDJSON source".into(),
1212        )));
1213    }
1214
1215    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1216    let mut executor = CompiledRowStream::new(plan);
1217    let mut writer = ndjson_writer_with_options(writer, options);
1218    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1219    let mut emitted = 0usize;
1220
1221    while !executor.is_exhausted() {
1222        let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1223            break;
1224        };
1225        if emit_row_stream_result(
1226            executor.apply_owned_row(engine, line_no, row)?,
1227            &mut writer,
1228            &mut emitted,
1229            external_limit,
1230            options,
1231        )? {
1232            break;
1233        }
1234        if executor.is_exhausted() {
1235            break;
1236        }
1237    }
1238
1239    writer.flush()?;
1240    Ok((emitted, executor.stats().clone()))
1241}
1242
1243fn drive_ndjson_rows_stream_file<P, W>(
1244    engine: &JetroEngine,
1245    path: P,
1246    plan: &RowStreamPlan,
1247    external_limit: Option<usize>,
1248    options: NdjsonOptions,
1249    writer: W,
1250) -> Result<usize, JetroEngineError>
1251where
1252    P: AsRef<Path>,
1253    W: Write,
1254{
1255    if let Some(value) =
1256        super::ndjson_parallel::collect_rows_stream_file(engine, path.as_ref(), plan, options)?
1257    {
1258        return write_collected_rows_stream(value, external_limit, options, writer);
1259    }
1260
1261    let (emitted, _) = drive_ndjson_rows_stream_file_with_stats(
1262        engine,
1263        path,
1264        plan,
1265        external_limit,
1266        options,
1267        writer,
1268    )?;
1269    Ok(emitted)
1270}
1271
1272fn write_collected_rows_stream<W: Write>(
1273    value: Val,
1274    external_limit: Option<usize>,
1275    options: NdjsonOptions,
1276    writer: W,
1277) -> Result<usize, JetroEngineError> {
1278    let mut writer = ndjson_writer_with_options(writer, options);
1279    let mut emitted = 0usize;
1280    match value {
1281        Val::Arr(values) => {
1282            for value in values.iter() {
1283                if external_limit.is_some_and(|limit| emitted >= limit) {
1284                    break;
1285                }
1286                if write_val_line_with_options(&mut writer, value, options)? {
1287                    emitted += 1;
1288                }
1289            }
1290        }
1291        value => {
1292            if write_val_line_with_options(&mut writer, &value, options)? {
1293                emitted += 1;
1294            }
1295        }
1296    }
1297    writer.flush()?;
1298    Ok(emitted)
1299}
1300
1301fn drive_ndjson_rows_stream_file_with_stats<P, W>(
1302    engine: &JetroEngine,
1303    path: P,
1304    plan: &RowStreamPlan,
1305    external_limit: Option<usize>,
1306    options: NdjsonOptions,
1307    writer: W,
1308) -> Result<(usize, RowStreamStats), JetroEngineError>
1309where
1310    P: AsRef<Path>,
1311    W: Write,
1312{
1313    if plan.direction == RowStreamDirection::Forward {
1314        let file = File::open(path)?;
1315        return drive_ndjson_rows_stream_reader_with_stats(
1316            engine,
1317            std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1318            plan,
1319            external_limit,
1320            options,
1321            writer,
1322        );
1323    }
1324
1325    let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1326    let mut executor = CompiledRowStream::new(plan);
1327    let mut writer = ndjson_writer_with_options(writer, options);
1328    let mut emitted = 0usize;
1329
1330    while !executor.is_exhausted() {
1331        let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1332            break;
1333        };
1334        if emit_row_stream_result(
1335            executor.apply_owned_row(engine, line_no, row)?,
1336            &mut writer,
1337            &mut emitted,
1338            external_limit,
1339            options,
1340        )? {
1341            break;
1342        }
1343        if executor.is_exhausted() {
1344            break;
1345        }
1346    }
1347
1348    writer.flush()?;
1349    Ok((emitted, executor.stats().clone()))
1350}
1351
1352fn emit_row_stream_result<W: Write>(
1353    result: RowStreamRowResult,
1354    writer: &mut W,
1355    emitted: &mut usize,
1356    external_limit: Option<usize>,
1357    options: NdjsonOptions,
1358) -> Result<bool, JetroEngineError> {
1359    let wrote = match result {
1360        RowStreamRowResult::Emit(value) => write_val_line_with_options(writer, &value, options)?,
1361        RowStreamRowResult::EmitBytes(bytes) => {
1362            write_json_bytes_line_with_options(writer, &bytes, options)?
1363        }
1364        RowStreamRowResult::Skip => return Ok(false),
1365        RowStreamRowResult::Stop => return Ok(true),
1366    };
1367    if wrote {
1368        *emitted += 1;
1369    }
1370    Ok(external_limit.is_some_and(|limit| *emitted >= limit))
1371}
1372
1373fn drive_ndjson_writer<R, W>(
1374    engine: &JetroEngine,
1375    reader: R,
1376    query: &str,
1377    limit: Option<usize>,
1378    options: NdjsonOptions,
1379    writer: W,
1380) -> Result<usize, JetroEngineError>
1381where
1382    R: BufRead,
1383    W: Write,
1384{
1385    #[cfg(feature = "simd-json")]
1386    if let Some((byte_plan, tape_plan)) = direct_writer_plans(engine, query) {
1387        if let Some(byte_plan) = byte_plan {
1388            return drive_ndjson_byte_writer(
1389                engine, reader, &byte_plan, &tape_plan, limit, options, writer,
1390            );
1391        }
1392        if tape_plan_can_write_byte_row(&tape_plan) {
1393            return drive_ndjson_tape_byte_writer(
1394                engine, reader, &tape_plan, limit, options, writer,
1395            );
1396        }
1397        return drive_ndjson_tape_writer(engine, reader, &tape_plan, limit, options, writer);
1398    }
1399
1400    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1401    let mut executor = NdjsonRowExecutor::new(engine, query);
1402    let mut writer = ndjson_writer_with_options(writer, options);
1403    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1404    let mut count = 0usize;
1405
1406    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1407        let value = executor.eval_owned_row(line_no, row)?;
1408        if write_val_line_with_options(&mut writer, &value, options)? {
1409            count += 1;
1410        }
1411        if limit.is_some_and(|limit| count >= limit) {
1412            break;
1413        }
1414    }
1415
1416    writer.flush()?;
1417    Ok(count)
1418}
1419
1420#[cfg(feature = "simd-json")]
1421fn drive_ndjson_byte_writer<R, W>(
1422    engine: &JetroEngine,
1423    reader: R,
1424    byte_plan: &NdjsonDirectBytePlan,
1425    tape_plan: &NdjsonDirectTapePlan,
1426    limit: Option<usize>,
1427    options: NdjsonOptions,
1428    writer: W,
1429) -> Result<usize, JetroEngineError>
1430where
1431    R: BufRead,
1432    W: Write,
1433{
1434    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1435    let mut writer = ndjson_writer_with_options(writer, options);
1436    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1437    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1438    let mut scratch =
1439        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1440    let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1441    let mut count = 0usize;
1442
1443    visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1444        out.clear();
1445        match write_ndjson_byte_plan_row(&mut out, row, byte_plan)? {
1446            BytePlanWrite::Done => {}
1447            BytePlanWrite::Fallback => {
1448                scratch.parse_slice(row).map_err(|message| {
1449                    row_parse_error(
1450                        line_no,
1451                        JetroEngineError::Eval(crate::EvalError(format!(
1452                            "Invalid JSON: {message}"
1453                        ))),
1454                    )
1455                })?;
1456                tape_runner.write_row(&scratch, &mut out)?;
1457            }
1458        }
1459        if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1460            count += 1;
1461        }
1462        Ok(!limit.is_some_and(|limit| count >= limit))
1463    })?;
1464
1465    writer.flush()?;
1466    Ok(count)
1467}
1468
1469#[cfg(feature = "simd-json")]
1470fn drive_ndjson_tape_byte_writer<R, W>(
1471    engine: &JetroEngine,
1472    reader: R,
1473    tape_plan: &NdjsonDirectTapePlan,
1474    limit: Option<usize>,
1475    options: NdjsonOptions,
1476    writer: W,
1477) -> Result<usize, JetroEngineError>
1478where
1479    R: BufRead,
1480    W: Write,
1481{
1482    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1483    let mut writer = ndjson_writer_with_options(writer, options);
1484    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1485    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1486    let mut scratch =
1487        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1488    let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
1489    let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1490    let mut constant_stream_cache = NdjsonConstantStreamCache::default();
1491    let mut hint_state = matches!(
1492        tape_plan,
1493        NdjsonDirectTapePlan::Object(_)
1494            | NdjsonDirectTapePlan::Array(_)
1495            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1496                sink: NdjsonDirectStreamSink::Collect(_),
1497                ..
1498            })
1499            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1500                sink: NdjsonDirectStreamSink::First(_),
1501                ..
1502            })
1503            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1504                sink: NdjsonDirectStreamSink::Last(_),
1505                ..
1506            })
1507            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1508                sink: NdjsonDirectStreamSink::Count,
1509                ..
1510            })
1511            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1512                sink: NdjsonDirectStreamSink::Numeric { .. },
1513                ..
1514            })
1515            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1516                sink: NdjsonDirectStreamSink::Extreme { .. },
1517                ..
1518            })
1519    )
1520    .then(|| {
1521        NdjsonHintState::new(
1522            NdjsonHintConfig::default(),
1523            NdjsonHintAccessPlan::from_direct_plans(None, tape_plan),
1524        )
1525    });
1526    let mut count = 0usize;
1527
1528    visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1529        out.clear();
1530        if let Some(write) = constant_stream_cache.write_row(&mut out, row, tape_plan)? {
1531            if matches!(write, BytePlanWrite::Done) {
1532                if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1533                    count += 1;
1534                }
1535                return Ok(!limit.is_some_and(|limit| count >= limit));
1536            }
1537        }
1538        let hinted = if let Some(state) = hint_state.as_mut() {
1539            if state.observe_row(row) == NdjsonHintDecision::UseHints {
1540                byte_scratch.clear();
1541                let write = state
1542                    .with_root_layout_match(row, |root, matched| {
1543                        write_ndjson_hinted_tape_plan_row(
1544                            &mut byte_scratch,
1545                            tape_plan,
1546                            root,
1547                            matched,
1548                        )
1549                    })
1550                    .transpose()?
1551                    .unwrap_or(BytePlanWrite::Fallback);
1552                if matches!(write, BytePlanWrite::Done) {
1553                    out.extend_from_slice(&byte_scratch);
1554                }
1555                Some(write)
1556            } else {
1557                None
1558            }
1559        } else {
1560            None
1561        };
1562        let write = match hinted {
1563            Some(write) => Ok(write),
1564            None => write_ndjson_byte_tape_plan_row(&mut out, row, tape_plan, &mut byte_scratch),
1565        };
1566        match write? {
1567            BytePlanWrite::Done => {}
1568            BytePlanWrite::Fallback => {
1569                scratch.parse_slice(row).map_err(|message| {
1570                    row_parse_error(
1571                        line_no,
1572                        JetroEngineError::Eval(crate::EvalError(format!(
1573                            "Invalid JSON: {message}"
1574                        ))),
1575                    )
1576                })?;
1577                tape_runner.write_row(&scratch, &mut out)?;
1578            }
1579        }
1580        if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1581            count += 1;
1582        }
1583        Ok(!limit.is_some_and(|limit| count >= limit))
1584    })?;
1585
1586    writer.flush()?;
1587    Ok(count)
1588}
1589
1590#[cfg(feature = "simd-json")]
1591fn visit_ndjson_borrowed_rows<R, F>(
1592    driver: &mut NdjsonPerRowDriver<R>,
1593    spill: &mut Vec<u8>,
1594    mut visit: F,
1595) -> Result<(), JetroEngineError>
1596where
1597    R: BufRead,
1598    F: FnMut(u64, &[u8]) -> Result<bool, JetroEngineError>,
1599{
1600    loop {
1601        spill.clear();
1602        let available = driver.reader.fill_buf()?;
1603        if available.is_empty() {
1604            return Ok(());
1605        }
1606        if let Some(pos) = memchr(b'\n', available) {
1607            driver.line_no += 1;
1608            let line_no = driver.line_no;
1609            let mut row = &available[..pos];
1610            if row.last() == Some(&b'\r') {
1611                row = &row[..row.len() - 1];
1612            }
1613            if line_no == 1 && row.starts_with(&[0xef, 0xbb, 0xbf]) {
1614                row = &row[3..];
1615            }
1616            let (start, end) = non_ws_range(row);
1617            let keep_going = if start == end {
1618                true
1619            } else {
1620                let trimmed = &row[start..end];
1621                if trimmed.len() > driver.max_line_len {
1622                    return Err(RowError::LineTooLarge {
1623                        line_no,
1624                        len: trimmed.len(),
1625                        max: driver.max_line_len,
1626                    }
1627                    .into());
1628                }
1629                match frame_payload(driver.row_frame, line_no, trimmed)? {
1630                    FramePayload::Data(range) => visit(line_no, &trimmed[range])?,
1631                    FramePayload::Skip => true,
1632                }
1633            };
1634            driver.reader.consume(pos + 1);
1635            if !keep_going {
1636                return Ok(());
1637            }
1638        } else {
1639            let read = driver.read_physical_line(spill)?;
1640            if read == 0 {
1641                return Ok(());
1642            }
1643            driver.line_no += 1;
1644            strip_initial_bom(driver.line_no, spill);
1645            trim_line_ending(spill);
1646            let (start, end) = non_ws_range(spill);
1647            if start == end {
1648                continue;
1649            }
1650            let len = end - start;
1651            if len > driver.max_line_len {
1652                return Err(RowError::LineTooLarge {
1653                    line_no: driver.line_no,
1654                    len,
1655                    max: driver.max_line_len,
1656                }
1657                .into());
1658            }
1659            match frame_payload(driver.row_frame, driver.line_no, &spill[start..end])? {
1660                FramePayload::Data(range) => {
1661                    if !visit(
1662                        driver.line_no,
1663                        &spill[start + range.start..start + range.end],
1664                    )? {
1665                        return Ok(());
1666                    }
1667                }
1668                FramePayload::Skip => {}
1669            }
1670        }
1671    }
1672}
1673
1674#[cfg(feature = "simd-json")]
1675fn drive_ndjson_tape_writer<R, W>(
1676    engine: &JetroEngine,
1677    reader: R,
1678    plan: &NdjsonDirectTapePlan,
1679    limit: Option<usize>,
1680    options: NdjsonOptions,
1681    writer: W,
1682) -> Result<usize, JetroEngineError>
1683where
1684    R: BufRead,
1685    W: Write,
1686{
1687    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1688    let mut writer = ndjson_writer_with_options(writer, options);
1689    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1690    let mut scratch =
1691        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1692    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1693    let mut count = 0usize;
1694    let mut runner = NdjsonTapeWriterRunner::new(engine, plan);
1695
1696    while let Some((line_no, row)) = driver.read_next_nonempty(&mut line)? {
1697        out.clear();
1698        scratch.parse_slice(row).map_err(|message| {
1699            row_parse_error(
1700                line_no,
1701                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
1702            )
1703        })?;
1704        runner.write_row(&scratch, &mut out)?;
1705        if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1706            count += 1;
1707        }
1708        if limit.is_some_and(|limit| count >= limit) {
1709            break;
1710        }
1711    }
1712
1713    writer.flush()?;
1714    Ok(count)
1715}
1716
1717#[cfg(feature = "simd-json")]
1718pub(super) struct NdjsonTapeWriterRunner<'a, 'p> {
1719    plan: &'p NdjsonDirectTapePlan,
1720    vm: Option<MutexGuard<'a, VM>>,
1721    env: Option<crate::data::context::Env>,
1722    root_path: NdjsonPathCache,
1723    source_path: NdjsonPathCache,
1724    suffix_path: NdjsonPathCache,
1725    predicate_path: NdjsonPathCache,
1726    object_paths: Vec<NdjsonPathCache>,
1727}
1728
1729#[cfg(feature = "simd-json")]
1730impl<'a, 'p> NdjsonTapeWriterRunner<'a, 'p> {
1731    pub(super) fn new(engine: &'a JetroEngine, plan: &'p NdjsonDirectTapePlan) -> Self {
1732        let needs_vm = plan.needs_vm();
1733        Self {
1734            plan,
1735            vm: needs_vm.then(|| engine.lock_vm()),
1736            env: needs_vm.then(|| crate::data::context::Env::new(Val::Null)),
1737            root_path: NdjsonPathCache::default(),
1738            source_path: NdjsonPathCache::default(),
1739            suffix_path: NdjsonPathCache::default(),
1740            predicate_path: NdjsonPathCache::default(),
1741            object_paths: Vec::new(),
1742        }
1743    }
1744
1745    pub(super) fn write_row<W: Write>(
1746        &mut self,
1747        scratch: &crate::data::tape::TapeScratch,
1748        writer: &mut W,
1749    ) -> Result<(), JetroEngineError> {
1750        match self.plan {
1751            NdjsonDirectTapePlan::RootPath(steps) => {
1752                if let Some(idx) = self.root_path.index(scratch, 0, steps) {
1753                    write_json_tape_at(writer, scratch, idx)?;
1754                } else {
1755                    writer.write_all(b"null")?;
1756                }
1757            }
1758            NdjsonDirectTapePlan::ViewScalarCall {
1759                steps,
1760                call,
1761                optional,
1762            } => {
1763                let idx = self.root_path.index(scratch, 0, steps);
1764                let value = idx
1765                    .map(|idx| json_tape_scalar(scratch, idx))
1766                    .unwrap_or(crate::util::JsonView::Null);
1767                if *optional && matches!(value, crate::util::JsonView::Null) {
1768                    writer.write_all(b"null")?;
1769                } else if let Some(value) = call.try_apply_json_view(value) {
1770                    write_val_json(writer, &value)?;
1771                } else if let Some(idx) = idx {
1772                    write_json_tape_at(writer, scratch, idx)?;
1773                } else {
1774                    writer.write_all(b"null")?;
1775                }
1776            }
1777            NdjsonDirectTapePlan::ArrayElementViewScalarCall {
1778                source_steps,
1779                element,
1780                suffix_steps,
1781                call,
1782            } => {
1783                let idx = self
1784                    .source_path
1785                    .index(scratch, 0, source_steps)
1786                    .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1787                    .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1788                if let Some(value) = idx
1789                    .map(|idx| json_tape_scalar(scratch, idx))
1790                    .and_then(|value| call.try_apply_json_view(value))
1791                {
1792                    write_val_json(writer, &value)?;
1793                } else if let Some(idx) = idx {
1794                    write_json_tape_at(writer, scratch, idx)?;
1795                } else {
1796                    writer.write_all(b"null")?;
1797                }
1798            }
1799            NdjsonDirectTapePlan::ObjectItems { steps, method } => {
1800                let idx = self.root_path.index(scratch, 0, steps);
1801                write_json_tape_object_items(writer, scratch, idx, *method)?;
1802            }
1803            NdjsonDirectTapePlan::ArrayElementPath {
1804                source_steps,
1805                element,
1806                suffix_steps,
1807            } => {
1808                let idx = self
1809                    .source_path
1810                    .index(scratch, 0, source_steps)
1811                    .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1812                    .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1813                if let Some(idx) = idx {
1814                    write_json_tape_at(writer, scratch, idx)?;
1815                } else {
1816                    writer.write_all(b"null")?;
1817                }
1818            }
1819            NdjsonDirectTapePlan::Stream(plan) => {
1820                write_json_tape_stream(
1821                    writer,
1822                    scratch,
1823                    plan,
1824                    &mut self.source_path,
1825                    &mut self.suffix_path,
1826                    &mut self.predicate_path,
1827                    &mut self.object_paths,
1828                )?;
1829            }
1830            NdjsonDirectTapePlan::Object(fields) => {
1831                write_json_tape_object_projection(writer, scratch, fields, &mut self.object_paths)?;
1832            }
1833            NdjsonDirectTapePlan::Array(items) => {
1834                write_json_tape_array_projection(writer, scratch, items, &mut self.object_paths)?;
1835            }
1836            NdjsonDirectTapePlan::ViewPipeline { source_steps, body } => {
1837                let (Some(vm), Some(env)) = (self.vm.as_deref_mut(), self.env.as_ref()) else {
1838                    return Err(JetroEngineError::Eval(crate::EvalError(
1839                        "NDJSON view pipeline requires VM state".to_string(),
1840                    )));
1841                };
1842                let source = json_tape_path_index(scratch, source_steps)
1843                    .map(|idx| crate::data::view::TapeScratchView::Node { tape: scratch, idx })
1844                    .unwrap_or(crate::data::view::TapeScratchView::Missing);
1845                let Some(result) =
1846                    crate::exec::view::run_with_env_and_vm(source, body, None, &env, vm)
1847                else {
1848                    writer.write_all(b"null")?;
1849                    return Ok(());
1850                };
1851                write_val_json(writer, &result.map_err(JetroEngineError::Eval)?)?;
1852            }
1853        }
1854        Ok(())
1855    }
1856}
1857
1858#[cfg(feature = "simd-json")]
1859#[derive(Default)]
1860pub(super) struct NdjsonPathCache {
1861    // Per-depth verified field deltas for stable object layouts. Every use
1862    // checks the cached key before returning the value node, so mixed-schema
1863    // rows safely fall back to a normal object scan.
1864    fields: Vec<Option<NdjsonFieldCache>>,
1865}
1866
1867#[cfg(feature = "simd-json")]
1868#[derive(Clone, Copy)]
1869struct NdjsonFieldCache {
1870    key_delta: usize,
1871    value_delta: usize,
1872}
1873
1874#[cfg(feature = "simd-json")]
1875struct NdjsonPathCaches<'a> {
1876    source: &'a mut NdjsonPathCache,
1877    suffix: &'a mut NdjsonPathCache,
1878    predicate: &'a mut NdjsonPathCache,
1879}
1880
1881#[cfg(feature = "simd-json")]
1882impl NdjsonPathCache {
1883    fn index<T: JsonTape>(
1884        &mut self,
1885        tape: &T,
1886        start: usize,
1887        steps: &[crate::ir::physical::PhysicalPathStep],
1888    ) -> Option<usize> {
1889        if let Some(idx) = self.index_cached(tape, start, steps) {
1890            return Some(idx);
1891        }
1892        self.index_uncached(tape, start, steps)
1893    }
1894
1895    fn index_cached<T: JsonTape>(
1896        &self,
1897        tape: &T,
1898        start: usize,
1899        steps: &[crate::ir::physical::PhysicalPathStep],
1900    ) -> Option<usize> {
1901        use crate::ir::physical::PhysicalPathStep;
1902
1903        let [PhysicalPathStep::Field(key), rest @ ..] = steps else {
1904            return None;
1905        };
1906        if rest
1907            .iter()
1908            .any(|step| matches!(step, PhysicalPathStep::Field(_)))
1909        {
1910            return None;
1911        }
1912        let Some(field) = self
1913            .fields
1914            .first()
1915            .copied()
1916            .flatten()
1917            .filter(|field| field.key_delta > 1)
1918        else {
1919            return None;
1920        };
1921        let idx = json_tape_object_cached_field(tape, start, field, key.as_ref())?;
1922        let mut cur = idx;
1923        for step in rest {
1924            cur = json_tape_step_index(tape, cur, step)?;
1925        }
1926        Some(cur)
1927    }
1928
1929    fn index_uncached<T: JsonTape>(
1930        &mut self,
1931        tape: &T,
1932        start: usize,
1933        steps: &[crate::ir::physical::PhysicalPathStep],
1934    ) -> Option<usize> {
1935        self.index_from_depth(tape, start, steps, 0)
1936    }
1937
1938    fn index_from_depth<T: JsonTape>(
1939        &mut self,
1940        tape: &T,
1941        start: usize,
1942        steps: &[crate::ir::physical::PhysicalPathStep],
1943        depth: usize,
1944    ) -> Option<usize> {
1945        use crate::ir::physical::PhysicalPathStep;
1946
1947        match steps {
1948            [] => Some(start),
1949            [PhysicalPathStep::Field(key), rest @ ..] => {
1950                if self.fields.len() <= depth {
1951                    self.fields.resize(depth + 1, None);
1952                }
1953
1954                if let Some(field) = self.fields[depth].filter(|field| field.key_delta > 1) {
1955                    if let Some(idx) =
1956                        json_tape_object_cached_field(tape, start, field, key.as_ref())
1957                    {
1958                        return self.index_from_depth(tape, idx, rest, depth + 1);
1959                    }
1960                }
1961
1962                let (idx, field) =
1963                    json_tape_object_field_index_and_cache(tape, start, key.as_ref())?;
1964                self.fields[depth] = Some(field);
1965                self.index_from_depth(tape, idx, rest, depth + 1)
1966            }
1967            [step, rest @ ..] => {
1968                let idx = json_tape_step_index(tape, start, step)?;
1969                self.index_from_depth(tape, idx, rest, depth + 1)
1970            }
1971        }
1972    }
1973}
1974
1975#[cfg(feature = "simd-json")]
1976fn drive_ndjson_tape_matches_writer<R, W>(
1977    engine: &JetroEngine,
1978    reader: R,
1979    predicate: &NdjsonDirectPredicate,
1980    limit: usize,
1981    options: NdjsonOptions,
1982    writer: W,
1983) -> Result<usize, JetroEngineError>
1984where
1985    R: BufRead,
1986    W: Write,
1987{
1988    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1989    let mut writer = ndjson_writer_with_options(writer, options);
1990    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1991    let mut scratch =
1992        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1993    let mut emitted = 0usize;
1994    let needs_vm = predicate_needs_vm(predicate);
1995    let mut vm = needs_vm.then(|| engine.lock_vm());
1996    let env = needs_vm.then(|| crate::data::context::Env::new(Val::Null));
1997    let mut predicate_path = NdjsonPathCache::default();
1998
1999    while let Some((line_no, row)) = driver.read_next_owned(&mut line)? {
2000        if let Some(matched) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2001            if !matched {
2002                continue;
2003            }
2004            writer.write_all(&row)?;
2005            writer.write_all(b"\n")?;
2006            emitted += 1;
2007            if emitted >= limit {
2008                break;
2009            }
2010            continue;
2011        }
2012
2013        scratch.parse_slice(&row).map_err(|message| {
2014            row_parse_error(
2015                line_no,
2016                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
2017            )
2018        })?;
2019        if !eval_tape_predicate(
2020            &scratch,
2021            predicate,
2022            env.as_ref(),
2023            &mut vm,
2024            &mut predicate_path,
2025        )
2026        .map_err(JetroEngineError::Eval)?
2027        {
2028            continue;
2029        }
2030        writer.write_all(&row)?;
2031        writer.write_all(b"\n")?;
2032        emitted += 1;
2033        if emitted >= limit {
2034            break;
2035        }
2036    }
2037
2038    writer.flush()?;
2039    Ok(emitted)
2040}
2041
2042fn drive_ndjson_matches<R, F>(
2043    engine: &JetroEngine,
2044    reader: R,
2045    predicate: &str,
2046    limit: usize,
2047    options: NdjsonOptions,
2048    mut emit: F,
2049) -> Result<usize, JetroEngineError>
2050where
2051    R: BufRead,
2052    F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
2053{
2054    if limit == 0 {
2055        return Ok(0);
2056    }
2057
2058    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2059    #[cfg(feature = "simd-json")]
2060    let direct_predicate = direct_tape_predicate(engine, predicate);
2061    let mut executor = NdjsonRowExecutor::new(engine, predicate);
2062    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2063    let mut emitted = 0usize;
2064
2065    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2066        #[cfg(feature = "simd-json")]
2067        if let Some(predicate) = direct_predicate.as_ref() {
2068            if let Some(false) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2069                continue;
2070            }
2071        }
2072
2073        let document = executor.parse_owned_row(line_no, row)?;
2074        let matched = executor.eval_document(line_no, &document)?;
2075        if !is_truthy(&matched) {
2076            continue;
2077        }
2078
2079        let root = document
2080            .root_val_with(engine.keys())
2081            .map_err(|err| row_eval_error(line_no, err))?;
2082        emitted += 1;
2083        if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
2084            break;
2085        }
2086    }
2087
2088    Ok(emitted)
2089}
2090
2091fn drive_ndjson_matches_writer<R, W>(
2092    engine: &JetroEngine,
2093    reader: R,
2094    predicate: &str,
2095    limit: usize,
2096    options: NdjsonOptions,
2097    writer: W,
2098) -> Result<usize, JetroEngineError>
2099where
2100    R: BufRead,
2101    W: Write,
2102{
2103    if limit == 0 {
2104        return Ok(0);
2105    }
2106
2107    #[cfg(feature = "simd-json")]
2108    if let Some(predicate) = direct_tape_predicate(engine, predicate) {
2109        return drive_ndjson_tape_matches_writer(
2110            engine, reader, &predicate, limit, options, writer,
2111        );
2112    }
2113
2114    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2115    let mut executor = NdjsonRowExecutor::new(engine, predicate);
2116    let mut writer = ndjson_writer_with_options(writer, options);
2117    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2118    let mut emitted = 0usize;
2119
2120    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2121        let document = executor.parse_owned_row(line_no, row)?;
2122        let matched = executor.eval_document(line_no, &document)?;
2123        if !is_truthy(&matched) {
2124            continue;
2125        }
2126
2127        write_document_line(&mut writer, &document, line_no, executor.engine())?;
2128        emitted += 1;
2129        if emitted >= limit {
2130            break;
2131        }
2132    }
2133
2134    writer.flush()?;
2135    Ok(emitted)
2136}
2137
2138pub(super) struct NdjsonRowExecutor<'a> {
2139    engine: &'a JetroEngine,
2140    plan: crate::ir::physical::QueryPlan,
2141    vm: MutexGuard<'a, VM>,
2142}
2143
2144impl<'a> NdjsonRowExecutor<'a> {
2145    pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
2146        Self {
2147            engine,
2148            plan: engine.cached_plan(query, PlanningContext::bytes()),
2149            vm: engine.lock_vm(),
2150        }
2151    }
2152
2153    pub(super) fn eval_owned_row(
2154        &mut self,
2155        line_no: u64,
2156        row: Vec<u8>,
2157    ) -> Result<Val, JetroEngineError> {
2158        let document = self.parse_owned_row(line_no, row)?;
2159        self.eval_document(line_no, &document)
2160    }
2161
2162    pub(super) fn parse_owned_row(
2163        &self,
2164        line_no: u64,
2165        row: Vec<u8>,
2166    ) -> Result<Jetro, JetroEngineError> {
2167        parse_row(self.engine, line_no, row)
2168    }
2169
2170    pub(super) fn eval_document(
2171        &mut self,
2172        line_no: u64,
2173        document: &Jetro,
2174    ) -> Result<Val, JetroEngineError> {
2175        crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
2176            .map_err(|err| row_eval_error(line_no, err))
2177    }
2178
2179    pub(super) fn engine(&self) -> &'a JetroEngine {
2180        self.engine
2181    }
2182}
2183
2184#[cfg(feature = "simd-json")]
2185trait JsonTape {
2186    fn nodes(&self) -> &[crate::data::tape::TapeNode];
2187    fn str_at(&self, idx: usize) -> &str;
2188    fn span(&self, idx: usize) -> usize;
2189}
2190
2191#[cfg(feature = "simd-json")]
2192impl JsonTape for crate::data::tape::TapeData {
2193    #[inline]
2194    fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2195        &self.nodes
2196    }
2197
2198    #[inline]
2199    fn str_at(&self, idx: usize) -> &str {
2200        self.str_at(idx)
2201    }
2202
2203    #[inline]
2204    fn span(&self, idx: usize) -> usize {
2205        self.span(idx)
2206    }
2207}
2208
2209#[cfg(feature = "simd-json")]
2210impl JsonTape for crate::data::tape::TapeScratch {
2211    #[inline]
2212    fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2213        &self.nodes
2214    }
2215
2216    #[inline]
2217    fn str_at(&self, idx: usize) -> &str {
2218        self.str_at(idx)
2219    }
2220
2221    #[inline]
2222    fn span(&self, idx: usize) -> usize {
2223        self.span(idx)
2224    }
2225}
2226
2227#[cfg(feature = "simd-json")]
2228fn json_tape_path_index<T: JsonTape>(
2229    tape: &T,
2230    steps: &[crate::ir::physical::PhysicalPathStep],
2231) -> Option<usize> {
2232    json_tape_path_index_from(tape, 0, steps)
2233}
2234
2235#[cfg(feature = "simd-json")]
2236fn json_tape_path_index_from<T: JsonTape>(
2237    tape: &T,
2238    start: usize,
2239    steps: &[crate::ir::physical::PhysicalPathStep],
2240) -> Option<usize> {
2241    if tape.nodes().is_empty() {
2242        return None;
2243    }
2244
2245    return match steps {
2246        [] => Some(start),
2247        [step] => json_tape_step_index(tape, start, step),
2248        [first, second] => json_tape_step_index(tape, start, first)
2249            .and_then(|idx| json_tape_step_index(tape, idx, second)),
2250        _ => json_tape_path_index_slow(tape, start, steps),
2251    };
2252}
2253
2254#[cfg(feature = "simd-json")]
2255fn json_tape_path_index_slow<T: JsonTape>(
2256    tape: &T,
2257    start: usize,
2258    steps: &[crate::ir::physical::PhysicalPathStep],
2259) -> Option<usize> {
2260    let mut idx = start;
2261    for step in steps {
2262        idx = json_tape_step_index(tape, idx, step)?;
2263    }
2264    Some(idx)
2265}
2266
2267#[cfg(feature = "simd-json")]
2268fn json_tape_step_index<T: JsonTape>(
2269    tape: &T,
2270    start: usize,
2271    step: &crate::ir::physical::PhysicalPathStep,
2272) -> Option<usize> {
2273    use crate::data::tape::TapeNode;
2274    use crate::ir::physical::PhysicalPathStep;
2275
2276    match step {
2277        PhysicalPathStep::Field(key) => {
2278            let TapeNode::Object { len, .. } = tape.nodes()[start] else {
2279                return None;
2280            };
2281            let mut cur = start + 1;
2282            for _ in 0..len {
2283                if tape.str_at(cur) == key.as_ref() {
2284                    return Some(cur + 1);
2285                }
2286                cur += 1;
2287                cur += tape.span(cur);
2288            }
2289            None
2290        }
2291        PhysicalPathStep::Index(wanted) => {
2292            let TapeNode::Array { len, .. } = tape.nodes()[start] else {
2293                return None;
2294            };
2295            let wanted = if *wanted < 0 {
2296                len.checked_sub(wanted.unsigned_abs() as usize)?
2297            } else {
2298                *wanted as usize
2299            };
2300            if wanted >= len {
2301                return None;
2302            }
2303            let mut cur = start + 1;
2304            for _ in 0..wanted {
2305                cur += tape.span(cur);
2306            }
2307            Some(cur)
2308        }
2309    }
2310}
2311
2312#[cfg(feature = "simd-json")]
2313fn json_tape_object_cached_field<T: JsonTape>(
2314    tape: &T,
2315    obj_idx: usize,
2316    cache: NdjsonFieldCache,
2317    key: &str,
2318) -> Option<usize> {
2319    let crate::data::tape::TapeNode::Object { .. } = tape.nodes().get(obj_idx).copied()? else {
2320        return None;
2321    };
2322    let key_idx = obj_idx.checked_add(cache.key_delta)?;
2323    let value_idx = obj_idx.checked_add(cache.value_delta)?;
2324    if value_idx >= tape.nodes().len() {
2325        return None;
2326    }
2327    if !matches!(
2328        tape.nodes().get(key_idx),
2329        Some(crate::data::tape::TapeNode::String(_))
2330    ) {
2331        return None;
2332    }
2333    (tape.str_at(key_idx) == key).then_some(value_idx)
2334}
2335
2336#[cfg(feature = "simd-json")]
2337fn json_tape_object_field_index_and_cache<T: JsonTape>(
2338    tape: &T,
2339    obj_idx: usize,
2340    key: &str,
2341) -> Option<(usize, NdjsonFieldCache)> {
2342    let crate::data::tape::TapeNode::Object { len, .. } = tape.nodes()[obj_idx] else {
2343        return None;
2344    };
2345    let mut cur = obj_idx + 1;
2346    for _ in 0..len {
2347        if tape.str_at(cur) == key {
2348            return Some((
2349                cur + 1,
2350                NdjsonFieldCache {
2351                    key_delta: cur - obj_idx,
2352                    value_delta: cur + 1 - obj_idx,
2353                },
2354            ));
2355        }
2356        cur += 1;
2357        cur += tape.span(cur);
2358    }
2359    None
2360}
2361
2362#[cfg(feature = "simd-json")]
2363fn json_tape_array_element<T: JsonTape>(
2364    tape: &T,
2365    idx: usize,
2366    element: NdjsonDirectElement,
2367) -> Option<usize> {
2368    let crate::data::tape::TapeNode::Array { len, .. } = tape.nodes().get(idx).copied()? else {
2369        return None;
2370    };
2371    let wanted = match element {
2372        NdjsonDirectElement::First => 0,
2373        NdjsonDirectElement::Last => len.checked_sub(1)?,
2374        NdjsonDirectElement::Nth(n) => n,
2375    };
2376    if wanted >= len {
2377        return None;
2378    }
2379    let mut cur = idx + 1;
2380    for _ in 0..wanted {
2381        cur += tape.span(cur);
2382    }
2383    Some(cur)
2384}
2385
2386#[cfg(feature = "simd-json")]
2387pub(super) fn eval_tape_predicate(
2388    tape: &crate::data::tape::TapeScratch,
2389    predicate: &NdjsonDirectPredicate,
2390    env: Option<&crate::data::context::Env>,
2391    vm: &mut Option<std::sync::MutexGuard<'_, crate::vm::exec::VM>>,
2392    cache: &mut NdjsonPathCache,
2393) -> Result<bool, crate::EvalError> {
2394    use crate::parse::ast::BinOp;
2395
2396    Ok(match predicate {
2397        NdjsonDirectPredicate::Path(steps) => cache
2398            .index(tape, 0, steps)
2399            .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
2400            .unwrap_or(false),
2401        NdjsonDirectPredicate::Literal(value) => crate::util::is_truthy(value),
2402        NdjsonDirectPredicate::Not(inner) => !eval_tape_predicate(tape, inner, env, vm, cache)?,
2403        NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
2404            eval_tape_predicate(tape, lhs, env, vm, cache)?
2405                && eval_tape_predicate(tape, rhs, env, vm, cache)?
2406        }
2407        NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
2408            eval_tape_predicate(tape, lhs, env, vm, cache)?
2409                || eval_tape_predicate(tape, rhs, env, vm, cache)?
2410        }
2411        NdjsonDirectPredicate::Binary { lhs, op, rhs } => {
2412            let Some(lhs) = eval_tape_scalar(tape, lhs, cache) else {
2413                return Ok(false);
2414            };
2415            let Some(rhs) = eval_tape_scalar(tape, rhs, cache) else {
2416                return Ok(false);
2417            };
2418            crate::util::json_cmp_binop(lhs, *op, rhs)
2419        }
2420        NdjsonDirectPredicate::ViewScalarCall { steps, call } => cache
2421            .index(tape, 0, steps)
2422            .map(|idx| json_tape_scalar(tape, idx))
2423            .and_then(|value| call.try_apply_json_view(value))
2424            .is_some_and(|value| crate::util::is_truthy(&value)),
2425        NdjsonDirectPredicate::ArrayElementViewScalarCall {
2426            source_steps,
2427            element,
2428            suffix_steps,
2429            call,
2430        } => json_tape_path_index(tape, source_steps)
2431            .and_then(|idx| json_tape_array_element(tape, idx, *element))
2432            .and_then(|idx| json_tape_path_index_from(tape, idx, suffix_steps))
2433            .map(|idx| json_tape_scalar(tape, idx))
2434            .and_then(|value| call.try_apply_json_view(value))
2435            .is_some_and(|value| crate::util::is_truthy(&value)),
2436        NdjsonDirectPredicate::ViewPipeline { source_steps, body } => {
2437            let (Some(vm), Some(env)) = (vm.as_deref_mut(), env) else {
2438                return Err(crate::EvalError(
2439                    "view pipeline predicate requires VM state".to_string(),
2440                ));
2441            };
2442            let source = json_tape_path_index(tape, source_steps)
2443                .map(|idx| crate::data::view::TapeScratchView::Node { tape, idx })
2444                .unwrap_or(crate::data::view::TapeScratchView::Missing);
2445            crate::exec::view::run_with_env_and_vm(source, body, None, env, vm)
2446                .transpose()?
2447                .is_some_and(|value| crate::util::is_truthy(&value))
2448        }
2449    })
2450}
2451
2452#[cfg(feature = "simd-json")]
2453pub(super) fn predicate_needs_vm(predicate: &NdjsonDirectPredicate) -> bool {
2454    match predicate {
2455        NdjsonDirectPredicate::Not(inner) => predicate_needs_vm(inner),
2456        NdjsonDirectPredicate::Binary { lhs, rhs, .. } => {
2457            predicate_needs_vm(lhs) || predicate_needs_vm(rhs)
2458        }
2459        NdjsonDirectPredicate::ViewPipeline { .. } => true,
2460        NdjsonDirectPredicate::Path(_)
2461        | NdjsonDirectPredicate::Literal(_)
2462        | NdjsonDirectPredicate::ViewScalarCall { .. }
2463        | NdjsonDirectPredicate::ArrayElementViewScalarCall { .. } => false,
2464    }
2465}
2466
2467#[cfg(feature = "simd-json")]
2468fn eval_tape_scalar<'a>(
2469    tape: &'a crate::data::tape::TapeScratch,
2470    predicate: &'a NdjsonDirectPredicate,
2471    cache: &mut NdjsonPathCache,
2472) -> Option<crate::util::JsonView<'a>> {
2473    match predicate {
2474        NdjsonDirectPredicate::Path(steps) => cache
2475            .index(tape, 0, steps)
2476            .map(|idx| json_tape_scalar(tape, idx)),
2477        NdjsonDirectPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2478        _ => None,
2479    }
2480}
2481
2482#[cfg(feature = "simd-json")]
2483fn json_view_truthy(value: crate::util::JsonView<'_>) -> bool {
2484    match value {
2485        crate::util::JsonView::Null => false,
2486        crate::util::JsonView::Bool(value) => value,
2487        crate::util::JsonView::Int(value) => value != 0,
2488        crate::util::JsonView::UInt(value) => value != 0,
2489        crate::util::JsonView::Float(value) => value != 0.0,
2490        crate::util::JsonView::Str(value) => !value.is_empty(),
2491        crate::util::JsonView::ArrayLen(len) | crate::util::JsonView::ObjectLen(len) => len > 0,
2492    }
2493}
2494
2495#[cfg(feature = "simd-json")]
2496fn json_tape_scalar<T: JsonTape>(tape: &T, idx: usize) -> crate::util::JsonView<'_> {
2497    use crate::data::tape::TapeNode;
2498    use simd_json::StaticNode as SN;
2499
2500    let Some(node) = tape.nodes().get(idx).copied() else {
2501        return crate::util::JsonView::Null;
2502    };
2503    match node {
2504        TapeNode::Static(SN::Null) => crate::util::JsonView::Null,
2505        TapeNode::Static(SN::Bool(value)) => crate::util::JsonView::Bool(value),
2506        TapeNode::Static(SN::I64(value)) => crate::util::JsonView::Int(value),
2507        TapeNode::Static(SN::U64(value)) => crate::util::JsonView::UInt(value),
2508        TapeNode::Static(SN::F64(value)) => crate::util::JsonView::Float(value),
2509        TapeNode::String(_) => crate::util::JsonView::Str(tape.str_at(idx)),
2510        TapeNode::Array { len, .. } => crate::util::JsonView::ArrayLen(len),
2511        TapeNode::Object { len, .. } => crate::util::JsonView::ObjectLen(len),
2512    }
2513}
2514
2515pub(super) fn write_val_line<W: Write>(
2516    writer: &mut W,
2517    value: &Val,
2518) -> Result<(), JetroEngineError> {
2519    write_val_json(writer, value)?;
2520    writer.write_all(b"\n")?;
2521    Ok(())
2522}
2523
2524pub(super) fn write_val_line_with_options<W: Write>(
2525    writer: &mut W,
2526    value: &Val,
2527    options: NdjsonOptions,
2528) -> Result<bool, JetroEngineError> {
2529    if value == &Val::Null && options.null_output == NdjsonNullOutput::Skip {
2530        return Ok(false);
2531    }
2532    write_val_line(writer, value)?;
2533    Ok(true)
2534}
2535
2536pub(super) fn write_json_bytes_line_with_options<W: Write>(
2537    writer: &mut W,
2538    bytes: &[u8],
2539    options: NdjsonOptions,
2540) -> Result<bool, JetroEngineError> {
2541    if is_json_null_bytes(bytes) && options.null_output == NdjsonNullOutput::Skip {
2542        return Ok(false);
2543    }
2544    writer.write_all(bytes)?;
2545    writer.write_all(b"\n")?;
2546    Ok(true)
2547}
2548
2549fn is_json_null_bytes(bytes: &[u8]) -> bool {
2550    bytes == b"null"
2551}
2552
2553pub(super) fn write_document_line<W: Write>(
2554    writer: &mut W,
2555    document: &Jetro,
2556    line_no: u64,
2557    engine: &JetroEngine,
2558) -> Result<(), JetroEngineError> {
2559    if let Some(bytes) = document.raw_bytes() {
2560        writer.write_all(bytes)?;
2561        writer.write_all(b"\n")?;
2562        return Ok(());
2563    }
2564
2565    let root = document
2566        .root_val_with(engine.keys())
2567        .map_err(|err| row_eval_error(line_no, err))?;
2568    write_val_line(writer, &root)
2569}
2570
2571pub(super) fn ndjson_writer_with_options<W: Write>(
2572    writer: W,
2573    options: NdjsonOptions,
2574) -> BufWriter<W> {
2575    let capacity = options
2576        .reader_buffer_capacity
2577        .max(DEFAULT_READER_BUFFER_CAPACITY);
2578    BufWriter::with_capacity(capacity, writer)
2579}
2580
2581pub(super) fn write_val_json<W: Write>(
2582    writer: &mut W,
2583    value: &Val,
2584) -> Result<(), JetroEngineError> {
2585    match value {
2586        Val::Null => writer.write_all(b"null")?,
2587        Val::Bool(true) => writer.write_all(b"true")?,
2588        Val::Bool(false) => writer.write_all(b"false")?,
2589        Val::Int(n) => write_i64(writer, *n)?,
2590        Val::Float(n) => write_f64(writer, *n)?,
2591        Val::Str(s) => write_json_str(writer, s.as_ref())?,
2592        Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
2593        Val::Arr(items) => write_json_array(writer, items.iter())?,
2594        Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
2595        Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
2596        Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
2597        Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
2598        Val::Obj(entries) => write_json_object(
2599            writer,
2600            entries.iter().map(|(key, value)| (key.as_ref(), value)),
2601        )?,
2602        Val::ObjSmall(entries) => write_json_object(
2603            writer,
2604            entries.iter().map(|(key, value)| (key.as_ref(), value)),
2605        )?,
2606        Val::ObjVec(data) => write_json_objvec(writer, data)?,
2607    }
2608    Ok(())
2609}
2610
2611#[cfg(feature = "simd-json")]
2612fn write_json_tape_at<W: Write, T: JsonTape>(
2613    writer: &mut W,
2614    tape: &T,
2615    idx: usize,
2616) -> Result<usize, JetroEngineError> {
2617    use crate::data::tape::TapeNode;
2618    use simd_json::StaticNode as SN;
2619
2620    let Some(node) = tape.nodes().get(idx).copied() else {
2621        writer.write_all(b"null")?;
2622        return Ok(idx);
2623    };
2624
2625    match node {
2626        TapeNode::Static(SN::Null) => {
2627            writer.write_all(b"null")?;
2628            Ok(idx + 1)
2629        }
2630        TapeNode::Static(SN::Bool(true)) => {
2631            writer.write_all(b"true")?;
2632            Ok(idx + 1)
2633        }
2634        TapeNode::Static(SN::Bool(false)) => {
2635            writer.write_all(b"false")?;
2636            Ok(idx + 1)
2637        }
2638        TapeNode::Static(SN::I64(value)) => {
2639            write_i64(writer, value)?;
2640            Ok(idx + 1)
2641        }
2642        TapeNode::Static(SN::U64(value)) => {
2643            write_u64(writer, value)?;
2644            Ok(idx + 1)
2645        }
2646        TapeNode::Static(SN::F64(value)) => {
2647            write_f64(writer, value)?;
2648            Ok(idx + 1)
2649        }
2650        TapeNode::String(_) => {
2651            write_json_str(writer, tape.str_at(idx))?;
2652            Ok(idx + 1)
2653        }
2654        TapeNode::Array { len, .. } => {
2655            writer.write_all(b"[")?;
2656            let mut cur = idx + 1;
2657            for item_idx in 0..len {
2658                if item_idx > 0 {
2659                    writer.write_all(b",")?;
2660                }
2661                cur = write_json_tape_at(writer, tape, cur)?;
2662            }
2663            writer.write_all(b"]")?;
2664            Ok(cur)
2665        }
2666        TapeNode::Object { len, .. } => {
2667            writer.write_all(b"{")?;
2668            let mut cur = idx + 1;
2669            for field_idx in 0..len {
2670                if field_idx > 0 {
2671                    writer.write_all(b",")?;
2672                }
2673                write_json_str(writer, tape.str_at(cur))?;
2674                writer.write_all(b":")?;
2675                cur = write_json_tape_at(writer, tape, cur + 1)?;
2676            }
2677            writer.write_all(b"}")?;
2678            Ok(cur)
2679        }
2680    }
2681}
2682
2683#[cfg(feature = "simd-json")]
2684fn visit_json_tape_source_items<T, E, F>(tape: &T, source_idx: usize, mut visit: F) -> Result<(), E>
2685where
2686    T: JsonTape,
2687    F: FnMut(usize) -> Result<(), E>,
2688{
2689    use crate::data::tape::TapeNode;
2690
2691    match tape.nodes().get(source_idx).copied() {
2692        Some(TapeNode::Array { len, .. }) => {
2693            let mut cur = source_idx + 1;
2694            for _ in 0..len {
2695                visit(cur)?;
2696                cur += tape.span(cur);
2697            }
2698        }
2699        Some(_) => visit(source_idx)?,
2700        None => {}
2701    }
2702    Ok(())
2703}
2704
2705#[cfg(feature = "simd-json")]
2706fn find_json_tape_source_item<T, F>(tape: &T, source_idx: usize, mut matches: F) -> Option<usize>
2707where
2708    T: JsonTape,
2709    F: FnMut(usize) -> bool,
2710{
2711    use crate::data::tape::TapeNode;
2712
2713    match tape.nodes().get(source_idx).copied()? {
2714        TapeNode::Array { len, .. } => {
2715            let mut cur = source_idx + 1;
2716            for _ in 0..len {
2717                if matches(cur) {
2718                    return Some(cur);
2719                }
2720                cur += tape.span(cur);
2721            }
2722            None
2723        }
2724        _ => matches(source_idx).then_some(source_idx),
2725    }
2726}
2727
2728#[cfg(feature = "simd-json")]
2729fn write_json_tape_stream<W: Write, T: JsonTape>(
2730    writer: &mut W,
2731    tape: &T,
2732    plan: &NdjsonDirectStreamPlan,
2733    source_cache: &mut NdjsonPathCache,
2734    suffix_cache: &mut NdjsonPathCache,
2735    predicate_cache: &mut NdjsonPathCache,
2736    projection_caches: &mut Vec<NdjsonPathCache>,
2737) -> Result<(), JetroEngineError> {
2738    let Some(source_idx) = source_cache.index(tape, 0, &plan.source_steps) else {
2739        write_json_tape_empty_stream_result(writer, &plan.sink)?;
2740        return Ok(());
2741    };
2742
2743    match &plan.sink {
2744        NdjsonDirectStreamSink::Collect(map) => {
2745            writer.write_all(b"[")?;
2746            let mut wrote_row = false;
2747            visit_json_tape_source_items(tape, source_idx, |item_idx| {
2748                if !plan.predicate.as_ref().is_none_or(|predicate| {
2749                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2750                }) {
2751                    return Ok::<(), JetroEngineError>(());
2752                }
2753                if wrote_row {
2754                    writer.write_all(b",")?;
2755                }
2756                write_json_tape_stream_map(
2757                    writer,
2758                    tape,
2759                    item_idx,
2760                    map,
2761                    suffix_cache,
2762                    projection_caches,
2763                )?;
2764                wrote_row = true;
2765                Ok(())
2766            })?;
2767            writer.write_all(b"]")?;
2768        }
2769        NdjsonDirectStreamSink::Count => {
2770            let mut count = 0usize;
2771            let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
2772                if plan.predicate.as_ref().is_none_or(|predicate| {
2773                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2774                }) {
2775                    count += 1;
2776                }
2777                Ok(())
2778            });
2779            write_i64(writer, count as i64)?;
2780        }
2781        NdjsonDirectStreamSink::First(map) => {
2782            let selected = find_json_tape_source_item(tape, source_idx, |item_idx| {
2783                plan.predicate.as_ref().is_none_or(|predicate| {
2784                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2785                })
2786            });
2787            if let Some(item_idx) = selected {
2788                write_json_tape_stream_map(
2789                    writer,
2790                    tape,
2791                    item_idx,
2792                    map,
2793                    suffix_cache,
2794                    projection_caches,
2795                )?;
2796            } else {
2797                writer.write_all(b"null")?;
2798            }
2799        }
2800        NdjsonDirectStreamSink::Last(map) => {
2801            let mut selected = None;
2802            visit_json_tape_source_items(tape, source_idx, |item_idx| {
2803                if plan.predicate.as_ref().is_none_or(|predicate| {
2804                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2805                }) {
2806                    selected = Some(item_idx);
2807                }
2808                Ok::<(), JetroEngineError>(())
2809            })?;
2810            if let Some(item_idx) = selected {
2811                write_json_tape_stream_map(
2812                    writer,
2813                    tape,
2814                    item_idx,
2815                    map,
2816                    suffix_cache,
2817                    projection_caches,
2818                )?;
2819            } else {
2820                writer.write_all(b"null")?;
2821            }
2822        }
2823        NdjsonDirectStreamSink::Numeric { suffix_steps, op } => {
2824            let caches = NdjsonPathCaches {
2825                source: source_cache,
2826                suffix: suffix_cache,
2827                predicate: predicate_cache,
2828            };
2829            let value = reduce_json_tape_numeric_path(
2830                tape,
2831                &plan.source_steps,
2832                plan.predicate.as_ref(),
2833                suffix_steps,
2834                *op,
2835                caches,
2836            );
2837            write_val_json(writer, &value)?;
2838        }
2839        NdjsonDirectStreamSink::Extreme {
2840            key_steps,
2841            want_max,
2842            value,
2843        } => {
2844            let mut best_idx = None;
2845            visit_json_tape_source_items(tape, source_idx, |item_idx| {
2846                let Some(key_idx) = suffix_cache.index(tape, item_idx, key_steps) else {
2847                    return Ok::<(), JetroEngineError>(());
2848                };
2849                let key = json_tape_scalar(tape, key_idx);
2850                let replace = best_idx
2851                    .and_then(|idx| suffix_cache.index(tape, idx, key_steps))
2852                    .map(|idx| {
2853                        let order = crate::util::json_cmp_vals(key, json_tape_scalar(tape, idx));
2854                        (*want_max && order.is_gt()) || (!*want_max && order.is_lt())
2855                    })
2856                    .unwrap_or(true);
2857                if replace {
2858                    best_idx = Some(item_idx);
2859                }
2860                Ok(())
2861            })?;
2862            if let Some(item_idx) = best_idx {
2863                let path_idx = match value {
2864                    NdjsonDirectProjectionValue::Path(steps)
2865                    | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2866                        suffix_cache.index(tape, item_idx, steps)
2867                    }
2868                    NdjsonDirectProjectionValue::Nested(_)
2869                    | NdjsonDirectProjectionValue::Literal(_) => None,
2870                };
2871                write_json_tape_direct_value(writer, tape, value, path_idx)?;
2872            } else {
2873                writer.write_all(b"null")?;
2874            }
2875        }
2876    }
2877
2878    Ok(())
2879}
2880
2881#[cfg(feature = "simd-json")]
2882fn write_json_tape_empty_stream_result<W: Write>(
2883    writer: &mut W,
2884    sink: &NdjsonDirectStreamSink,
2885) -> Result<(), JetroEngineError> {
2886    match sink {
2887        NdjsonDirectStreamSink::Collect(_) => writer.write_all(b"[]")?,
2888        NdjsonDirectStreamSink::First(_) | NdjsonDirectStreamSink::Last(_) => {
2889            writer.write_all(b"null")?
2890        }
2891        NdjsonDirectStreamSink::Count => writer.write_all(b"0")?,
2892        NdjsonDirectStreamSink::Numeric { op, .. } => {
2893            let value = crate::exec::pipeline::num_finalise(
2894                *op,
2895                0,
2896                0.0,
2897                false,
2898                f64::INFINITY,
2899                f64::NEG_INFINITY,
2900                0,
2901            );
2902            write_val_json(writer, &value)?;
2903        }
2904        NdjsonDirectStreamSink::Extreme { .. } => writer.write_all(b"null")?,
2905    }
2906    Ok(())
2907}
2908
2909#[cfg(feature = "simd-json")]
2910fn write_json_tape_stream_map<W: Write, T: JsonTape>(
2911    writer: &mut W,
2912    tape: &T,
2913    item_idx: usize,
2914    map: &NdjsonDirectStreamMap,
2915    suffix_cache: &mut NdjsonPathCache,
2916    projection_caches: &mut Vec<NdjsonPathCache>,
2917) -> Result<(), JetroEngineError> {
2918    match map {
2919        NdjsonDirectStreamMap::Value(value) => {
2920            let path_idx = match value {
2921                NdjsonDirectProjectionValue::Path(steps)
2922                | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2923                    suffix_cache.index(tape, item_idx, steps)
2924                }
2925                NdjsonDirectProjectionValue::Nested(_) => None,
2926                NdjsonDirectProjectionValue::Literal(_) => None,
2927            };
2928            write_json_tape_direct_value(writer, tape, value, path_idx)?;
2929        }
2930        NdjsonDirectStreamMap::Array(items) => {
2931            write_json_tape_array_projection_from(
2932                writer,
2933                tape,
2934                item_idx,
2935                items,
2936                projection_caches,
2937            )?;
2938        }
2939        NdjsonDirectStreamMap::Object(fields) => {
2940            write_json_tape_object_projection_from(
2941                writer,
2942                tape,
2943                item_idx,
2944                fields,
2945                projection_caches,
2946            )?;
2947        }
2948    }
2949    Ok(())
2950}
2951
2952#[cfg(feature = "simd-json")]
2953fn write_json_tape_object_projection<W: Write, T: JsonTape>(
2954    writer: &mut W,
2955    tape: &T,
2956    fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2957    path_caches: &mut Vec<NdjsonPathCache>,
2958) -> Result<(), JetroEngineError> {
2959    write_json_tape_object_projection_from(writer, tape, 0, fields, path_caches)
2960}
2961
2962#[cfg(feature = "simd-json")]
2963fn write_json_tape_object_projection_from<W: Write, T: JsonTape>(
2964    writer: &mut W,
2965    tape: &T,
2966    start: usize,
2967    fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2968    path_caches: &mut Vec<NdjsonPathCache>,
2969) -> Result<(), JetroEngineError> {
2970    if path_caches.len() < fields.len() {
2971        path_caches.resize_with(fields.len(), NdjsonPathCache::default);
2972    }
2973    writer.write_all(b"{")?;
2974    let mut wrote = false;
2975    for (field_idx, field) in fields.iter().enumerate() {
2976        let path_cache = &mut path_caches[field_idx];
2977        let mut path_idx = None;
2978        match &field.value {
2979            NdjsonDirectProjectionValue::Path(steps) => {
2980                let idx = path_cache.index(tape, start, steps);
2981                path_idx = idx;
2982                if field.optional
2983                    && idx
2984                        .map(|idx| {
2985                            matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
2986                        })
2987                        .unwrap_or(true)
2988                {
2989                    continue;
2990                }
2991            }
2992            NdjsonDirectProjectionValue::ViewScalarCall {
2993                steps,
2994                call,
2995                optional,
2996            } => {
2997                let idx = path_cache.index(tape, start, steps);
2998                path_idx = idx;
2999                if (*optional || field.optional)
3000                    && idx
3001                        .map(|idx| {
3002                            matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
3003                        })
3004                        .unwrap_or(true)
3005                {
3006                    continue;
3007                }
3008                if field.optional
3009                    && idx
3010                        .map(|idx| json_tape_scalar(tape, idx))
3011                        .and_then(|value| call.try_apply_json_view(value))
3012                        .is_some_and(|value| matches!(value, Val::Null))
3013                {
3014                    continue;
3015                }
3016            }
3017            NdjsonDirectProjectionValue::Literal(Val::Null) if field.optional => {
3018                continue;
3019            }
3020            NdjsonDirectProjectionValue::Nested(_) => {}
3021            NdjsonDirectProjectionValue::Literal(_) => {}
3022        }
3023        if wrote {
3024            writer.write_all(b",")?;
3025        }
3026        write_json_str(writer, field.key.as_ref())?;
3027        writer.write_all(b":")?;
3028        write_json_tape_direct_value(writer, tape, &field.value, path_idx)?;
3029        wrote = true;
3030    }
3031    writer.write_all(b"}")?;
3032    Ok(())
3033}
3034
3035#[cfg(feature = "simd-json")]
3036fn write_json_tape_array_projection<W: Write, T: JsonTape>(
3037    writer: &mut W,
3038    tape: &T,
3039    items: &[NdjsonDirectProjectionValue],
3040    path_caches: &mut Vec<NdjsonPathCache>,
3041) -> Result<(), JetroEngineError> {
3042    write_json_tape_array_projection_from(writer, tape, 0, items, path_caches)
3043}
3044
3045#[cfg(feature = "simd-json")]
3046fn write_json_tape_array_projection_from<W: Write, T: JsonTape>(
3047    writer: &mut W,
3048    tape: &T,
3049    start: usize,
3050    items: &[NdjsonDirectProjectionValue],
3051    path_caches: &mut Vec<NdjsonPathCache>,
3052) -> Result<(), JetroEngineError> {
3053    if path_caches.len() < items.len() {
3054        path_caches.resize_with(items.len(), NdjsonPathCache::default);
3055    }
3056    writer.write_all(b"[")?;
3057    for (idx, item) in items.iter().enumerate() {
3058        if idx > 0 {
3059            writer.write_all(b",")?;
3060        }
3061        let path_idx = match item {
3062            NdjsonDirectProjectionValue::Path(steps)
3063            | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3064                path_caches[idx].index(tape, start, steps)
3065            }
3066            NdjsonDirectProjectionValue::Nested(_) => None,
3067            NdjsonDirectProjectionValue::Literal(_) => None,
3068        };
3069        write_json_tape_direct_value(writer, tape, item, path_idx)?;
3070    }
3071    writer.write_all(b"]")?;
3072    Ok(())
3073}
3074
3075#[cfg(feature = "simd-json")]
3076fn write_json_tape_direct_value<W: Write, T: JsonTape>(
3077    writer: &mut W,
3078    tape: &T,
3079    value: &NdjsonDirectProjectionValue,
3080    path_idx: Option<usize>,
3081) -> Result<(), JetroEngineError> {
3082    match value {
3083        NdjsonDirectProjectionValue::Path(_) => {
3084            if let Some(idx) = path_idx {
3085                write_json_tape_at(writer, tape, idx)?;
3086            } else {
3087                writer.write_all(b"null")?;
3088            }
3089        }
3090        NdjsonDirectProjectionValue::ViewScalarCall { call, .. } => {
3091            if let Some(idx) = path_idx {
3092                let value = json_tape_scalar(tape, idx);
3093                if let Some(value) = call.try_apply_json_view(value) {
3094                    write_val_json(writer, &value)?;
3095                } else {
3096                    write_json_tape_at(writer, tape, idx)?;
3097                }
3098            } else {
3099                writer.write_all(b"null")?;
3100            }
3101        }
3102        NdjsonDirectProjectionValue::Literal(value) => write_val_json(writer, value)?,
3103        NdjsonDirectProjectionValue::Nested(plan) => {
3104            write_json_tape_nested_plan(writer, tape, plan)?;
3105        }
3106    }
3107    Ok(())
3108}
3109
3110#[cfg(feature = "simd-json")]
3111fn write_json_tape_nested_plan<W: Write, T: JsonTape>(
3112    writer: &mut W,
3113    tape: &T,
3114    plan: &NdjsonDirectTapePlan,
3115) -> Result<(), JetroEngineError> {
3116    let mut root_cache = NdjsonPathCache::default();
3117    let mut source_cache = NdjsonPathCache::default();
3118    let mut suffix_cache = NdjsonPathCache::default();
3119    let mut predicate_cache = NdjsonPathCache::default();
3120    let mut projection_caches = Vec::new();
3121    match plan {
3122        NdjsonDirectTapePlan::RootPath(steps) => {
3123            if let Some(idx) = root_cache.index(tape, 0, steps) {
3124                write_json_tape_at(writer, tape, idx)?;
3125            } else {
3126                writer.write_all(b"null")?;
3127            }
3128        }
3129        NdjsonDirectTapePlan::ViewScalarCall {
3130            steps,
3131            call,
3132            optional,
3133        } => {
3134            let idx = root_cache.index(tape, 0, steps);
3135            let value = idx
3136                .map(|idx| json_tape_scalar(tape, idx))
3137                .unwrap_or(crate::util::JsonView::Null);
3138            if *optional && matches!(value, crate::util::JsonView::Null) {
3139                writer.write_all(b"null")?;
3140            } else if let Some(value) = call.try_apply_json_view(value) {
3141                write_val_json(writer, &value)?;
3142            } else if let Some(idx) = idx {
3143                write_json_tape_at(writer, tape, idx)?;
3144            } else {
3145                writer.write_all(b"null")?;
3146            }
3147        }
3148        NdjsonDirectTapePlan::ArrayElementPath {
3149            source_steps,
3150            element,
3151            suffix_steps,
3152        } => {
3153            write_json_tape_array_element_path(
3154                writer,
3155                tape,
3156                source_steps,
3157                *element,
3158                suffix_steps,
3159                &mut source_cache,
3160                &mut suffix_cache,
3161            )?;
3162        }
3163        NdjsonDirectTapePlan::ArrayElementViewScalarCall {
3164            source_steps,
3165            element,
3166            suffix_steps,
3167            call,
3168        } => {
3169            write_json_tape_array_element_scalar(
3170                writer,
3171                tape,
3172                source_steps,
3173                *element,
3174                suffix_steps,
3175                call,
3176                &mut source_cache,
3177                &mut suffix_cache,
3178            )?;
3179        }
3180        NdjsonDirectTapePlan::Stream(stream) => {
3181            write_json_tape_stream(
3182                writer,
3183                tape,
3184                stream,
3185                &mut source_cache,
3186                &mut suffix_cache,
3187                &mut predicate_cache,
3188                &mut projection_caches,
3189            )?;
3190        }
3191        NdjsonDirectTapePlan::Object(fields) => {
3192            write_json_tape_object_projection(writer, tape, fields, &mut projection_caches)?;
3193        }
3194        NdjsonDirectTapePlan::Array(items) => {
3195            write_json_tape_array_projection(writer, tape, items, &mut projection_caches)?;
3196        }
3197        NdjsonDirectTapePlan::ObjectItems { steps, method } => {
3198            let idx = root_cache.index(tape, 0, steps);
3199            write_json_tape_object_items(writer, tape, idx, *method)?;
3200        }
3201        NdjsonDirectTapePlan::ViewPipeline { .. } => {
3202            writer.write_all(b"null")?;
3203        }
3204    }
3205    Ok(())
3206}
3207
3208#[cfg(feature = "simd-json")]
3209fn write_json_tape_array_element_path<W: Write, T: JsonTape>(
3210    writer: &mut W,
3211    tape: &T,
3212    source_steps: &[crate::ir::physical::PhysicalPathStep],
3213    element: super::ndjson_direct::NdjsonDirectElement,
3214    suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3215    source_cache: &mut NdjsonPathCache,
3216    suffix_cache: &mut NdjsonPathCache,
3217) -> Result<(), JetroEngineError> {
3218    let idx = source_cache
3219        .index(tape, 0, source_steps)
3220        .and_then(|idx| json_tape_array_element(tape, idx, element))
3221        .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3222    if let Some(idx) = idx {
3223        write_json_tape_at(writer, tape, idx)?;
3224    } else {
3225        writer.write_all(b"null")?;
3226    }
3227    Ok(())
3228}
3229
3230#[cfg(feature = "simd-json")]
3231fn write_json_tape_array_element_scalar<W: Write, T: JsonTape>(
3232    writer: &mut W,
3233    tape: &T,
3234    source_steps: &[crate::ir::physical::PhysicalPathStep],
3235    element: super::ndjson_direct::NdjsonDirectElement,
3236    suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3237    call: &crate::builtins::BuiltinCall,
3238    source_cache: &mut NdjsonPathCache,
3239    suffix_cache: &mut NdjsonPathCache,
3240) -> Result<(), JetroEngineError> {
3241    let idx = source_cache
3242        .index(tape, 0, source_steps)
3243        .and_then(|idx| json_tape_array_element(tape, idx, element))
3244        .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3245    if let Some(value) = idx
3246        .map(|idx| json_tape_scalar(tape, idx))
3247        .and_then(|value| call.try_apply_json_view(value))
3248    {
3249        write_val_json(writer, &value)?;
3250    } else if let Some(idx) = idx {
3251        write_json_tape_at(writer, tape, idx)?;
3252    } else {
3253        writer.write_all(b"null")?;
3254    }
3255    Ok(())
3256}
3257
3258#[cfg(feature = "simd-json")]
3259fn write_json_tape_object_items<W: Write, T: JsonTape>(
3260    writer: &mut W,
3261    tape: &T,
3262    obj_idx: Option<usize>,
3263    method: crate::builtins::BuiltinMethod,
3264) -> Result<(), JetroEngineError> {
3265    let Some(obj_idx) = obj_idx else {
3266        writer.write_all(b"[]")?;
3267        return Ok(());
3268    };
3269    let Some(crate::data::tape::TapeNode::Object { len, .. }) = tape.nodes().get(obj_idx).copied()
3270    else {
3271        writer.write_all(b"[]")?;
3272        return Ok(());
3273    };
3274
3275    writer.write_all(b"[")?;
3276    let mut cur = obj_idx + 1;
3277    for field_idx in 0..len {
3278        if field_idx > 0 {
3279            writer.write_all(b",")?;
3280        }
3281        match method {
3282            crate::builtins::BuiltinMethod::Keys => {
3283                write_json_str(writer, tape.str_at(cur))?;
3284                cur += 1;
3285                cur += tape.span(cur);
3286            }
3287            crate::builtins::BuiltinMethod::Values => {
3288                cur = write_json_tape_at(writer, tape, cur + 1)?;
3289            }
3290            crate::builtins::BuiltinMethod::Entries => {
3291                writer.write_all(b"[")?;
3292                write_json_str(writer, tape.str_at(cur))?;
3293                writer.write_all(b",")?;
3294                cur = write_json_tape_at(writer, tape, cur + 1)?;
3295                writer.write_all(b"]")?;
3296            }
3297            _ => unreachable!("non-object-items builtin"),
3298        }
3299    }
3300    writer.write_all(b"]")?;
3301    Ok(())
3302}
3303
3304#[cfg(feature = "simd-json")]
3305fn reduce_json_tape_numeric_path<T: JsonTape>(
3306    tape: &T,
3307    source_steps: &[crate::ir::physical::PhysicalPathStep],
3308    predicate: Option<&NdjsonDirectItemPredicate>,
3309    suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3310    op: crate::exec::pipeline::NumOp,
3311    caches: NdjsonPathCaches<'_>,
3312) -> Val {
3313    let mut acc_i = 0i64;
3314    let mut acc_f = 0.0f64;
3315    let mut floated = false;
3316    let mut min_f = f64::INFINITY;
3317    let mut max_f = f64::NEG_INFINITY;
3318    let mut n_obs = 0usize;
3319
3320    let Some(source_idx) = caches.source.index(tape, 0, source_steps) else {
3321        return crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs);
3322    };
3323
3324    let suffix_cache = caches.suffix;
3325    let predicate_cache = caches.predicate;
3326    let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
3327        if !predicate.is_none_or(|predicate| {
3328            eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3329        }) {
3330            return Ok(());
3331        }
3332        if let Some(idx) = suffix_cache.index(tape, item_idx, suffix_steps) {
3333            fold_json_tape_numeric(
3334                json_tape_scalar(tape, idx),
3335                op,
3336                &mut acc_i,
3337                &mut acc_f,
3338                &mut floated,
3339                &mut min_f,
3340                &mut max_f,
3341                &mut n_obs,
3342            );
3343        }
3344        Ok(())
3345    });
3346
3347    crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs)
3348}
3349
3350#[cfg(feature = "simd-json")]
3351#[allow(clippy::too_many_arguments)]
3352fn fold_json_tape_numeric(
3353    value: crate::util::JsonView<'_>,
3354    op: crate::exec::pipeline::NumOp,
3355    acc_i: &mut i64,
3356    acc_f: &mut f64,
3357    floated: &mut bool,
3358    min_f: &mut f64,
3359    max_f: &mut f64,
3360    n_obs: &mut usize,
3361) {
3362    match value {
3363        crate::util::JsonView::Int(value) => crate::exec::pipeline::num_fold_i64(
3364            acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3365        ),
3366        crate::util::JsonView::UInt(value) if value <= i64::MAX as u64 => {
3367            crate::exec::pipeline::num_fold_i64(
3368                acc_i,
3369                acc_f,
3370                floated,
3371                min_f,
3372                max_f,
3373                n_obs,
3374                op,
3375                value as i64,
3376            )
3377        }
3378        crate::util::JsonView::UInt(value) => crate::exec::pipeline::num_fold_f64(
3379            acc_i,
3380            acc_f,
3381            floated,
3382            min_f,
3383            max_f,
3384            n_obs,
3385            op,
3386            value as f64,
3387        ),
3388        crate::util::JsonView::Float(value) => crate::exec::pipeline::num_fold_f64(
3389            acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3390        ),
3391        _ => {}
3392    }
3393}
3394
3395#[cfg(feature = "simd-json")]
3396fn eval_json_tape_item_predicate_cached<T: JsonTape>(
3397    tape: &T,
3398    item_idx: usize,
3399    predicate: &NdjsonDirectItemPredicate,
3400    cache: &mut NdjsonPathCache,
3401) -> bool {
3402    use crate::parse::ast::BinOp;
3403
3404    match predicate {
3405        NdjsonDirectItemPredicate::Path(steps) => cache
3406            .index(tape, item_idx, steps)
3407            .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
3408            .unwrap_or(false),
3409        NdjsonDirectItemPredicate::Literal(value) => crate::util::is_truthy(value),
3410        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
3411            eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3412                && eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3413        }
3414        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
3415            eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3416                || eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3417        }
3418        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } => {
3419            let Some(lhs) = eval_json_tape_item_scalar_cached(tape, item_idx, lhs, cache) else {
3420                return false;
3421            };
3422            let Some(rhs) = eval_json_tape_item_scalar_cached(tape, item_idx, rhs, cache) else {
3423                return false;
3424            };
3425            crate::util::json_cmp_binop(lhs, *op, rhs)
3426        }
3427        NdjsonDirectItemPredicate::CmpLit { lhs, op, lit } => cache
3428            .index(tape, item_idx, lhs)
3429            .map(|idx| json_tape_scalar(tape, idx))
3430            .is_some_and(|value| {
3431                crate::util::json_cmp_binop(value, *op, crate::util::JsonView::from_val(lit))
3432            }),
3433        NdjsonDirectItemPredicate::ViewScalarCall { suffix_steps, call } => cache
3434            .index(tape, item_idx, suffix_steps)
3435            .map(|idx| json_tape_scalar(tape, idx))
3436            .and_then(|value| call.try_apply_json_view(value))
3437            .is_some_and(|value| crate::util::is_truthy(&value)),
3438    }
3439}
3440
3441#[cfg(feature = "simd-json")]
3442fn eval_json_tape_item_scalar_cached<'a, T: JsonTape>(
3443    tape: &'a T,
3444    item_idx: usize,
3445    predicate: &'a NdjsonDirectItemPredicate,
3446    cache: &mut NdjsonPathCache,
3447) -> Option<crate::util::JsonView<'a>> {
3448    match predicate {
3449        NdjsonDirectItemPredicate::Path(steps) => cache
3450            .index(tape, item_idx, steps)
3451            .map(|idx| json_tape_scalar(tape, idx)),
3452        NdjsonDirectItemPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
3453        _ => None,
3454    }
3455}
3456
3457fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3458where
3459    W: Write,
3460    I: IntoIterator<Item = &'a Val>,
3461{
3462    writer.write_all(b"[")?;
3463    let mut first = true;
3464    for item in items {
3465        if first {
3466            first = false;
3467        } else {
3468            writer.write_all(b",")?;
3469        }
3470        write_val_json(writer, item)?;
3471    }
3472    writer.write_all(b"]")?;
3473    Ok(())
3474}
3475
3476fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3477where
3478    W: Write,
3479    I: IntoIterator<Item = i64>,
3480{
3481    writer.write_all(b"[")?;
3482    let mut first = true;
3483    let mut buf = itoa::Buffer::new();
3484    for item in items {
3485        if first {
3486            first = false;
3487        } else {
3488            writer.write_all(b",")?;
3489        }
3490        writer.write_all(buf.format(item).as_bytes())?;
3491    }
3492    writer.write_all(b"]")?;
3493    Ok(())
3494}
3495
3496fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3497where
3498    W: Write,
3499    I: IntoIterator<Item = f64>,
3500{
3501    writer.write_all(b"[")?;
3502    let mut first = true;
3503    let mut buf = ryu::Buffer::new();
3504    for item in items {
3505        if first {
3506            first = false;
3507        } else {
3508            writer.write_all(b",")?;
3509        }
3510        if item.is_finite() {
3511            writer.write_all(buf.format(item).as_bytes())?;
3512        } else {
3513            writer.write_all(b"0")?;
3514        }
3515    }
3516    writer.write_all(b"]")?;
3517    Ok(())
3518}
3519
3520fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3521where
3522    W: Write,
3523    I: IntoIterator<Item = &'a str>,
3524{
3525    writer.write_all(b"[")?;
3526    let mut first = true;
3527    for item in items {
3528        if first {
3529            first = false;
3530        } else {
3531            writer.write_all(b",")?;
3532        }
3533        write_json_str(writer, item)?;
3534    }
3535    writer.write_all(b"]")?;
3536    Ok(())
3537}
3538
3539fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
3540where
3541    W: Write,
3542    I: IntoIterator<Item = (&'a str, &'a Val)>,
3543{
3544    writer.write_all(b"{")?;
3545    let mut first = true;
3546    for (key, value) in entries {
3547        if first {
3548            first = false;
3549        } else {
3550            writer.write_all(b",")?;
3551        }
3552        write_json_str(writer, key)?;
3553        writer.write_all(b":")?;
3554        write_val_json(writer, value)?;
3555    }
3556    writer.write_all(b"}")?;
3557    Ok(())
3558}
3559
3560fn write_json_objvec<W: Write>(
3561    writer: &mut W,
3562    data: &crate::data::value::ObjVecData,
3563) -> Result<(), JetroEngineError> {
3564    writer.write_all(b"[")?;
3565    for row in 0..data.nrows() {
3566        if row > 0 {
3567            writer.write_all(b",")?;
3568        }
3569        writer.write_all(b"{")?;
3570        for slot in 0..data.stride() {
3571            if slot > 0 {
3572                writer.write_all(b",")?;
3573            }
3574            write_json_str(writer, data.keys[slot].as_ref())?;
3575            writer.write_all(b":")?;
3576            write_val_json(writer, data.cell(row, slot))?;
3577        }
3578        writer.write_all(b"}")?;
3579    }
3580    writer.write_all(b"]")?;
3581    Ok(())
3582}
3583
3584pub(super) fn write_json_str<W: Write>(
3585    writer: &mut W,
3586    value: &str,
3587) -> Result<(), JetroEngineError> {
3588    writer.write_all(b"\"")?;
3589    let bytes = value.as_bytes();
3590    if !needs_json_escape(bytes) {
3591        writer.write_all(bytes)?;
3592        writer.write_all(b"\"")?;
3593        return Ok(());
3594    }
3595
3596    let mut start = 0usize;
3597
3598    for (idx, &byte) in bytes.iter().enumerate() {
3599        let escaped = match byte {
3600            b'"' => Some(br#"\""#.as_slice()),
3601            b'\\' => Some(br#"\\"#.as_slice()),
3602            b'\n' => Some(br#"\n"#.as_slice()),
3603            b'\r' => Some(br#"\r"#.as_slice()),
3604            b'\t' => Some(br#"\t"#.as_slice()),
3605            0x08 => Some(br#"\b"#.as_slice()),
3606            0x0c => Some(br#"\f"#.as_slice()),
3607            0x00..=0x1f => None,
3608            _ => continue,
3609        };
3610
3611        if start < idx {
3612            writer.write_all(&bytes[start..idx])?;
3613        }
3614        match escaped {
3615            Some(seq) => writer.write_all(seq)?,
3616            None => write_control_escape(writer, byte)?,
3617        }
3618        start = idx + 1;
3619    }
3620
3621    if start < bytes.len() {
3622        writer.write_all(&bytes[start..])?;
3623    }
3624    writer.write_all(b"\"")?;
3625    Ok(())
3626}
3627
3628#[inline]
3629pub(super) fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
3630    let mut buf = itoa::Buffer::new();
3631    writer.write_all(buf.format(value).as_bytes())?;
3632    Ok(())
3633}
3634
3635#[inline]
3636fn write_u64<W: Write>(writer: &mut W, value: u64) -> Result<(), JetroEngineError> {
3637    let mut buf = itoa::Buffer::new();
3638    writer.write_all(buf.format(value).as_bytes())?;
3639    Ok(())
3640}
3641
3642#[inline]
3643fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
3644    if value.is_finite() {
3645        let mut buf = ryu::Buffer::new();
3646        writer.write_all(buf.format(value).as_bytes())?;
3647    } else {
3648        writer.write_all(b"0")?;
3649    }
3650    Ok(())
3651}
3652
3653#[inline]
3654fn needs_json_escape(bytes: &[u8]) -> bool {
3655    bytes
3656        .iter()
3657        .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
3658}
3659
3660fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
3661    const HEX: &[u8; 16] = b"0123456789abcdef";
3662    writer.write_all(&[
3663        b'\\',
3664        b'u',
3665        b'0',
3666        b'0',
3667        HEX[(byte >> 4) as usize],
3668        HEX[(byte & 0x0f) as usize],
3669    ])?;
3670    Ok(())
3671}
3672
3673pub(super) fn collect_row_val(
3674    engine: &JetroEngine,
3675    document: &Jetro,
3676    plan: &crate::ir::physical::QueryPlan,
3677    line_no: u64,
3678) -> Result<Val, JetroEngineError> {
3679    engine
3680        .collect_prepared_val(document, plan)
3681        .map_err(|err| row_eval_error(line_no, err))
3682}
3683
3684pub(super) fn parse_row(
3685    engine: &JetroEngine,
3686    line_no: u64,
3687    row: Vec<u8>,
3688) -> Result<Jetro, JetroEngineError> {
3689    engine
3690        .parse_bytes_lazy(row)
3691        .map_err(|err| row_parse_error(line_no, err))
3692}
3693
3694pub(super) fn row_parse_error(line_no: u64, err: JetroEngineError) -> JetroEngineError {
3695    match err {
3696        JetroEngineError::Json(source) => RowError::InvalidJson { line_no, source }.into(),
3697        JetroEngineError::Eval(eval) => RowError::InvalidJsonMessage {
3698            line_no,
3699            message: eval.to_string(),
3700        }
3701        .into(),
3702        other => other,
3703    }
3704}
3705
3706pub(super) fn row_eval_error(line_no: u64, err: crate::EvalError) -> JetroEngineError {
3707    let message = err.0;
3708    if message.starts_with("Invalid JSON:") {
3709        RowError::InvalidJsonMessage { line_no, message }.into()
3710    } else {
3711        crate::EvalError(message).into()
3712    }
3713}
3714
3715fn trim_line_ending(buf: &mut Vec<u8>) {
3716    while matches!(buf.last(), Some(b'\n' | b'\r')) {
3717        buf.pop();
3718    }
3719}
3720
3721fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
3722    if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
3723        buf.drain(..3);
3724    }
3725}
3726
3727fn non_ws_range(buf: &[u8]) -> (usize, usize) {
3728    let start = buf
3729        .iter()
3730        .position(|b| !b.is_ascii_whitespace())
3731        .unwrap_or(buf.len());
3732    let end = buf
3733        .iter()
3734        .rposition(|b| !b.is_ascii_whitespace())
3735        .map(|idx| idx + 1)
3736        .unwrap_or(start);
3737    (start, end)
3738}
3739
3740#[cfg(test)]
3741mod tests {
3742    #[test]
3743    #[cfg(feature = "simd-json")]
3744    fn rows_stream_driver_reports_direct_stage_stats() {
3745        let engine = crate::JetroEngine::new();
3746        let plan = super::ndjson_rows_stream_plan(
3747            "$.rows().filter($.active == true).distinct_by($.id).take(2).map($.id)",
3748        )
3749        .unwrap()
3750        .unwrap();
3751        let input = std::io::Cursor::new(
3752            br#"{"id":"a","active":false}
3753{"id":"a","active":true}
3754{"id":"b","active":true}
3755not-json
3756"#
3757            .to_vec(),
3758        );
3759        let mut out = Vec::new();
3760
3761        let (emitted, stats) = super::drive_ndjson_rows_stream_reader_with_stats(
3762            &engine,
3763            input,
3764            &plan,
3765            None,
3766            super::NdjsonOptions::default(),
3767            &mut out,
3768        )
3769        .unwrap();
3770
3771        assert_eq!(emitted, 2);
3772        assert_eq!(String::from_utf8(out).unwrap(), "\"a\"\n\"b\"\n");
3773        assert_eq!(stats.source, super::RowStreamSourceKind::NdjsonRows);
3774        assert_eq!(stats.direction, super::RowStreamDirection::Forward);
3775        assert_eq!(stats.rows_scanned, 3);
3776        assert_eq!(stats.rows_filtered, 1);
3777        assert_eq!(stats.rows_emitted, 2);
3778        assert_eq!(stats.direct_filter_rows, 3);
3779        assert_eq!(stats.direct_key_rows, 2);
3780        assert_eq!(stats.direct_project_rows, 2);
3781    }
3782
3783    #[test]
3784    #[cfg(feature = "simd-json")]
3785    fn parse_row_keeps_simd_document_lazy() {
3786        let engine = crate::JetroEngine::new();
3787        let row = br#"{"name":"Ada","age":30}"#.to_vec();
3788
3789        let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
3790
3791        assert!(!document.root_val_is_materialized());
3792        assert!(!document.tape_is_built());
3793    }
3794
3795    #[test]
3796    fn owned_row_read_preserves_reusable_buffer_capacity() {
3797        let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
3798        let mut driver = super::NdjsonPerRowDriver::new(input);
3799        let mut buf = Vec::with_capacity(128);
3800
3801        let first = driver
3802            .read_next_owned(&mut buf)
3803            .expect("row read succeeds")
3804            .expect("first row exists");
3805        assert_eq!(first.1, br#"{"n":1}"#);
3806        assert_eq!(buf.capacity(), 128);
3807
3808        let second = driver
3809            .read_next_owned(&mut buf)
3810            .expect("row read succeeds")
3811            .expect("second row exists");
3812        assert_eq!(second.1, br#"{"n":2}"#);
3813        assert_eq!(buf.capacity(), 128);
3814    }
3815
3816    #[test]
3817    #[cfg(feature = "simd-json")]
3818    fn direct_tape_plan_accepts_first_suffix() {
3819        let engine = crate::JetroEngine::new();
3820        for query in [
3821            "attributes.first().value",
3822            "attributes.last().value",
3823            "attributes.nth(1).value",
3824        ] {
3825            let plan =
3826                super::direct_tape_plan(&engine, query).expect("array suffix should be direct");
3827            assert!(matches!(
3828                plan,
3829                super::NdjsonDirectTapePlan::ArrayElementPath { .. }
3830            ));
3831        }
3832    }
3833
3834    #[test]
3835    #[cfg(feature = "simd-json")]
3836    fn direct_tape_plan_accepts_rooted_bench_shapes() {
3837        let engine = crate::JetroEngine::new();
3838        for query in [
3839            "$.id",
3840            "$.a.b.c",
3841            "$.meta.id",
3842            "$.name",
3843            "$.attributes.len()",
3844            "$.store.attributes.len()",
3845            "$.attributes.map(@.key)",
3846            "$.attributes.first().value",
3847            "$.store.attributes.first().value",
3848            "$.attributes.last().value",
3849            "$.name.upper()",
3850            "$.store.name.upper()",
3851            "$.attributes.map([@.key, @.value])",
3852            r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3853            "$.keys()",
3854        ] {
3855            super::direct_tape_plan(&engine, query)
3856                .unwrap_or_else(|| panic!("{query} should have a direct NDJSON tape plan"));
3857        }
3858    }
3859
3860    #[test]
3861    #[cfg(feature = "simd-json")]
3862    fn direct_writer_plan_kind_exposes_hot_path_selection() {
3863        let engine = crate::JetroEngine::new();
3864        use super::NdjsonDirectPlanKind::{
3865            ByteExpr, TapeArrayProjection, TapeObjectProjection, TapeRootPath, TapeStreamCollect,
3866            TapeStreamCount, TapeStreamExtreme, TapeStreamFirst, TapeStreamLast, TapeStreamNumeric,
3867        };
3868
3869        for (query, expected) in [
3870            ("$.name", (Some(ByteExpr), TapeRootPath)),
3871            ("$.a.b.c", (Some(ByteExpr), TapeRootPath)),
3872            (r#"{test: $.a.b.c, b: $.a.b}"#, (None, TapeObjectProjection)),
3873            (r#"[$.id, $.name]"#, (None, TapeArrayProjection)),
3874            ("$.attributes.map(@.key)", (None, TapeStreamCollect)),
3875            (
3876                "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
3877                (None, TapeStreamCollect),
3878            ),
3879            (
3880                r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
3881                (None, TapeStreamFirst),
3882            ),
3883            ("$.attributes.map(@.value).last()", (None, TapeStreamLast)),
3884            (
3885                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3886                (None, TapeStreamCount),
3887            ),
3888            (
3889                "$.attributes.map(@.weight).sum()",
3890                (None, TapeStreamNumeric),
3891            ),
3892            (
3893                r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3894                (None, TapeStreamNumeric),
3895            ),
3896            (
3897                "$.attributes.sort_by(@.value).last().key",
3898                (None, TapeStreamExtreme),
3899            ),
3900        ] {
3901            let actual = super::direct_writer_plan_kind(&engine, query)
3902                .unwrap_or_else(|| panic!("{query} should have an observable direct plan"));
3903            assert_eq!(actual, expected, "{query}");
3904        }
3905    }
3906
3907    #[test]
3908    #[cfg(feature = "simd-json")]
3909    fn direct_writer_path_kind_matches_runtime_writer_family() {
3910        let engine = crate::JetroEngine::new();
3911        use super::NdjsonWriterPathKind::{ByteExpr, ByteWritableTape};
3912
3913        for (query, expected) in [
3914            ("$.name", ByteExpr),
3915            ("$.a.b.c", ByteExpr),
3916            (r#"{test: $.a.b.c, b: $.a.b}"#, ByteWritableTape),
3917            (r#"[$.id, $.name]"#, ByteWritableTape),
3918            ("$.attributes.map(@.key)", ByteWritableTape),
3919            (
3920                "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
3921                ByteWritableTape,
3922            ),
3923            (
3924                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3925                ByteWritableTape,
3926            ),
3927            ("$.attributes.map(@.weight).sum()", ByteWritableTape),
3928            (
3929                r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3930                ByteWritableTape,
3931            ),
3932            (
3933                r#"{id: $.id, name: $.name, count: $.attributes.len()}"#,
3934                ByteWritableTape,
3935            ),
3936            (
3937                r#"[$.id, $.name, $.attributes.first().value, $.attributes.last().value]"#,
3938                ByteWritableTape,
3939            ),
3940            (
3941                r#"{name_upper: $.name.upper(), values: $.attributes.map(@.value), last: $.attributes.last().value}"#,
3942                ByteWritableTape,
3943            ),
3944            ("$.attributes.sort_by(@.value).last().key", ByteWritableTape),
3945        ] {
3946            assert_eq!(
3947                super::direct_writer_path_kind(&engine, query),
3948                Some(expected),
3949                "{query}"
3950            );
3951        }
3952    }
3953
3954    #[test]
3955    #[cfg(feature = "simd-json")]
3956    fn direct_tape_plan_lowers_stream_shapes_generically() {
3957        let engine = crate::JetroEngine::new();
3958        for query in [
3959            "$.attributes.map(@.key)",
3960            "$.attributes.map(@.key.upper())",
3961            "$.attributes.map(@.value).first()",
3962            "$.attributes.map(@.value).last()",
3963            r#"$.attributes.filter(@.value.contains("_3")).map(@.key)"#,
3964            r#"$.attributes.filter(@.value.contains("_3")).map(@.key.upper())"#,
3965            r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
3966            r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3967            "$.attributes.map(@.weight).sum()",
3968            r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3969            "$.attributes.sort_by(@.value).last().key",
3970        ] {
3971            let plan =
3972                super::direct_tape_plan(&engine, query).expect("query should be direct NDJSON");
3973            assert!(
3974                matches!(plan, super::NdjsonDirectTapePlan::Stream(_)),
3975                "{query} should lower to a generic NDJSON stream plan"
3976            );
3977        }
3978    }
3979
3980    #[test]
3981    #[cfg(feature = "simd-json")]
3982    fn direct_byte_plan_accepts_fast_root_shapes() {
3983        let engine = crate::JetroEngine::new();
3984        for query in [
3985            "$.id",
3986            "$.name",
3987            "$.name.upper()",
3988            "$.name.lower()",
3989            "$.keys()",
3990            "$.meta.keys()",
3991            "$.values()",
3992            "$.entries()",
3993            "$.attributes.first().value",
3994            "$.store.attributes.first().value",
3995            "$.attributes.first().key.upper()",
3996            "$.attributes.last().value",
3997            "$.attributes.nth(1).value",
3998        ] {
3999            super::direct_byte_plan(&engine, query)
4000                .unwrap_or_else(|| panic!("{query} should have a direct NDJSON byte plan"));
4001        }
4002    }
4003
4004    #[test]
4005    #[cfg(feature = "simd-json")]
4006    fn direct_byte_predicates_cover_match_shapes() {
4007        let engine = crate::JetroEngine::new();
4008        let row = br#"{"active":true,"score":9910,"attributes":[{"key":"k1","value":"v_1"}]}"#;
4009        for predicate in [
4010            ("active", true),
4011            ("score > 9900", true),
4012            ("score < 100", false),
4013            (r#"attributes.first().value.contains("_1")"#, true),
4014        ] {
4015            let plan = super::direct_tape_predicate(&engine, predicate.0)
4016                .unwrap_or_else(|| panic!("{} should have a direct predicate", predicate.0));
4017            let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
4018                .expect("byte predicate should evaluate")
4019                .unwrap_or_else(|| panic!("{} should not need tape fallback", predicate.0));
4020            assert_eq!(matched, predicate.1, "{}", predicate.0);
4021        }
4022    }
4023
4024    #[test]
4025    #[cfg(feature = "simd-json")]
4026    fn direct_byte_tape_plan_counts_filtered_rows() {
4027        let engine = crate::JetroEngine::new();
4028        let query = r#"attributes.filter(@.value.contains("_3")).len()"#;
4029        let plan = super::direct_tape_plan(&engine, query).expect("filter count should be direct");
4030        assert!(super::tape_plan_can_write_byte_row(&plan));
4031
4032        let row = br#"{"attributes":[{"value":"a_3"},{"value":"b"},{"value":"c_3"}]}"#;
4033        let mut out = Vec::new();
4034        let mut scratch = Vec::new();
4035        let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4036            .expect("byte count should write");
4037        assert!(matches!(wrote, super::BytePlanWrite::Done));
4038        assert_eq!(out, b"2");
4039    }
4040
4041    #[test]
4042    #[cfg(feature = "simd-json")]
4043    fn direct_byte_tape_plan_reduces_numeric_streams() {
4044        let engine = crate::JetroEngine::new();
4045        let row = br#"{"attributes":[{"weight":1},{"weight":2.5},{"weight":3},{"weight":"skip"}]}"#;
4046        for (query, expected) in [
4047            ("$.attributes.map(@.weight).sum()", "6.5"),
4048            ("$.attributes.map(@.weight).avg()", "2.1666666666666665"),
4049            ("$.attributes.map(@.weight).min()", "1.0"),
4050            ("$.attributes.map(@.weight).max()", "3.0"),
4051        ] {
4052            let plan = super::direct_tape_plan(&engine, query)
4053                .unwrap_or_else(|| panic!("{query} should be direct"));
4054            assert!(
4055                super::tape_plan_can_write_byte_row(&plan),
4056                "{query} should be byte-writable"
4057            );
4058            let mut out = Vec::new();
4059            let mut scratch = Vec::new();
4060            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4061                .expect("byte numeric stream should write");
4062            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4063            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4064        }
4065    }
4066
4067    #[test]
4068    #[cfg(feature = "simd-json")]
4069    fn direct_byte_tape_plan_collects_stream_maps() {
4070        let engine = crate::JetroEngine::new();
4071        let row = br#"{"attributes":[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]}"#;
4072        for (query, expected) in [
4073            ("attributes.map(@.key)", r#"["k1","k2"]"#),
4074            (
4075                "attributes.map([@.key, @.value])",
4076                r#"[["k1","v1"],["k2","v2"]]"#,
4077            ),
4078            (
4079                "attributes.map({key: @.key, value: @.value})",
4080                r#"[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]"#,
4081            ),
4082            ("attributes.map(@.key.upper())", r#"["K1","K2"]"#),
4083            (
4084                r#"attributes.filter(@.value.contains("2")).map(@.key)"#,
4085                r#"["k2"]"#,
4086            ),
4087            (
4088                r#"attributes.filter(@.value.contains("2")).map({key: @.key, value: @.value})"#,
4089                r#"[{"key":"k2","value":"v2"}]"#,
4090            ),
4091            (
4092                r#"attributes.filter(@.key != "k1").map([@.key, @.value])"#,
4093                r#"[["k2","v2"]]"#,
4094            ),
4095        ] {
4096            let plan = super::direct_tape_plan(&engine, query)
4097                .unwrap_or_else(|| panic!("{query} should be direct"));
4098            assert!(
4099                super::tape_plan_can_write_byte_row(&plan),
4100                "{query} should be byte-writable"
4101            );
4102            let mut out = Vec::new();
4103            let mut scratch = Vec::new();
4104            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4105                .expect("byte stream should write");
4106            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4107            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4108        }
4109    }
4110
4111    #[test]
4112    #[cfg(feature = "simd-json")]
4113    fn direct_byte_tape_plan_writes_static_projections() {
4114        let engine = crate::JetroEngine::new();
4115        let row = br#"{"id":7,"a":{"b":{"c":1}}}"#;
4116        for (query, expected) in [
4117            ("$.a.b.c", "1"),
4118            (r#"{test: $.a.b.c, b: $.a.b}"#, r#"{"test":1,"b":{"c":1}}"#),
4119            (r#"[$.a.b.c, $.id]"#, r#"[1,7]"#),
4120        ] {
4121            let plan = super::direct_tape_plan(&engine, query)
4122                .unwrap_or_else(|| panic!("{query} should be direct"));
4123            assert!(
4124                super::tape_plan_can_write_byte_row(&plan),
4125                "{query} should be byte-writable"
4126            );
4127            let mut out = Vec::new();
4128            let mut scratch = Vec::new();
4129            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4130                .expect("byte projection should write");
4131            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4132            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4133        }
4134    }
4135
4136    #[test]
4137    #[cfg(feature = "simd-json")]
4138    fn run_ndjson_uses_byte_paths_for_nested_object_items() {
4139        let engine = crate::JetroEngine::new();
4140        let rows = std::io::Cursor::new(
4141            br#"{"id":1}
4142{"id":2}
4143"#,
4144        );
4145        let mut out = Vec::new();
4146        engine
4147            .run_ndjson(rows, "$.id", &mut out)
4148            .expect("rooted byte path should run");
4149        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4150
4151        let rows = std::io::Cursor::new(
4152            br#"{"meta":{"id":1,"kind":"a"}}
4153{"meta":{"id":2,"kind":"b"}}
4154"#,
4155        );
4156
4157        let mut out = Vec::new();
4158        engine
4159            .run_ndjson(rows, "$.meta.id", &mut out)
4160            .expect("nested byte path should run");
4161        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4162
4163        let rows = std::io::Cursor::new(br#"{"meta":{"id":1,"kind":"a"}}"#);
4164        let mut out = Vec::new();
4165        engine
4166            .run_ndjson(rows, "$.meta.keys()", &mut out)
4167            .expect("nested byte object items should run");
4168        assert_eq!(std::str::from_utf8(&out).unwrap(), "[\"id\",\"kind\"]\n");
4169    }
4170
4171    #[test]
4172    #[cfg(feature = "simd-json")]
4173    fn run_ndjson_uses_byte_paths_for_nested_array_demands() {
4174        let engine = crate::JetroEngine::new();
4175        let rows = std::io::Cursor::new(
4176            br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4177{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4178"#,
4179        );
4180
4181        let mut out = Vec::new();
4182        engine
4183            .run_ndjson(rows, "$.store.attributes.first().value", &mut out)
4184            .expect("nested byte array demand should run");
4185        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"a\"\n\"c\"\n");
4186
4187        out.clear();
4188        let rows = std::io::Cursor::new(
4189            br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4190{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4191"#,
4192        );
4193        engine
4194            .run_ndjson(rows, "$.store.attributes.last().value", &mut out)
4195            .expect("nested byte last demand should run from field prefix");
4196        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"d\"\n");
4197    }
4198
4199    #[test]
4200    #[cfg(feature = "simd-json")]
4201    fn run_ndjson_static_projection_survives_hint_activation() {
4202        let engine = crate::JetroEngine::new();
4203        let rows = std::io::Cursor::new(
4204            br#"{"id":1,"name":"a","active":true}
4205{"id":2,"name":"b","active":true}
4206{"id":3,"name":"c","active":true}
4207{"id":4,"name":"d","active":true}
4208{"id":5,"name":"e","active":true}
4209{"id":6,"name":"f","active":true}
4210{"id":7,"name":"g","active":true}
4211{"id":8,"name":"h","active":true}
4212{"id":9,"name":"i","active":true}
4213"#,
4214        );
4215        let mut out = Vec::new();
4216        engine
4217            .run_ndjson(rows, r#"{id: $.id, name: $.name}"#, &mut out)
4218            .expect("hinted static projection should run");
4219        assert_eq!(
4220            std::str::from_utf8(&out).unwrap(),
4221            "{\"id\":1,\"name\":\"a\"}\n\
4222{\"id\":2,\"name\":\"b\"}\n\
4223{\"id\":3,\"name\":\"c\"}\n\
4224{\"id\":4,\"name\":\"d\"}\n\
4225{\"id\":5,\"name\":\"e\"}\n\
4226{\"id\":6,\"name\":\"f\"}\n\
4227{\"id\":7,\"name\":\"g\"}\n\
4228{\"id\":8,\"name\":\"h\"}\n\
4229{\"id\":9,\"name\":\"i\"}\n"
4230        );
4231    }
4232
4233    #[test]
4234    #[cfg(feature = "simd-json")]
4235    fn run_ndjson_nested_projection_survives_hint_activation() {
4236        let engine = crate::JetroEngine::new();
4237        let rows = std::io::Cursor::new(
4238            br#"{"id":1,"profile":{"name":"a","score":10},"active":true}
4239{"id":2,"profile":{"name":"b","score":20},"active":true}
4240{"id":3,"profile":{"name":"c","score":30},"active":true}
4241{"id":4,"profile":{"name":"d","score":40},"active":true}
4242{"id":5,"profile":{"name":"e","score":50},"active":true}
4243{"id":6,"profile":{"name":"f","score":60},"active":true}
4244{"id":7,"profile":{"name":"g","score":70},"active":true}
4245{"id":8,"profile":{"name":"h","score":80},"active":true}
4246{"id":9,"profile":{"name":"i","score":90},"active":true}
4247"#,
4248        );
4249        let mut out = Vec::new();
4250        engine
4251            .run_ndjson(
4252                rows,
4253                r#"{id: $.id, name: $.profile.name, profile: $.profile}"#,
4254                &mut out,
4255            )
4256            .expect("hinted nested projection should run");
4257        assert_eq!(
4258            std::str::from_utf8(&out).unwrap(),
4259            "{\"id\":1,\"name\":\"a\",\"profile\":{\"name\":\"a\",\"score\":10}}\n\
4260{\"id\":2,\"name\":\"b\",\"profile\":{\"name\":\"b\",\"score\":20}}\n\
4261{\"id\":3,\"name\":\"c\",\"profile\":{\"name\":\"c\",\"score\":30}}\n\
4262{\"id\":4,\"name\":\"d\",\"profile\":{\"name\":\"d\",\"score\":40}}\n\
4263{\"id\":5,\"name\":\"e\",\"profile\":{\"name\":\"e\",\"score\":50}}\n\
4264{\"id\":6,\"name\":\"f\",\"profile\":{\"name\":\"f\",\"score\":60}}\n\
4265{\"id\":7,\"name\":\"g\",\"profile\":{\"name\":\"g\",\"score\":70}}\n\
4266{\"id\":8,\"name\":\"h\",\"profile\":{\"name\":\"h\",\"score\":80}}\n\
4267{\"id\":9,\"name\":\"i\",\"profile\":{\"name\":\"i\",\"score\":90}}\n"
4268        );
4269    }
4270
4271    #[test]
4272    #[cfg(feature = "simd-json")]
4273    fn run_ndjson_scalar_projection_survives_hint_activation() {
4274        let engine = crate::JetroEngine::new();
4275        let rows = std::io::Cursor::new(
4276            br#"{"id":1,"profile":{"name":"a"}}
4277{"id":2,"profile":{"name":"b"}}
4278{"id":3,"profile":{"name":"c"}}
4279{"id":4,"profile":{"name":"d"}}
4280{"id":5,"profile":{"name":"e"}}
4281{"id":6,"profile":{"name":"f"}}
4282{"id":7,"profile":{"name":"g"}}
4283{"id":8,"profile":{"name":"h"}}
4284{"id":9,"profile":{"name":"i"}}
4285"#,
4286        );
4287        let mut out = Vec::new();
4288        engine
4289            .run_ndjson(
4290                rows,
4291                r#"{id: $.id, name: $.profile.name.upper()}"#,
4292                &mut out,
4293            )
4294            .expect("hinted scalar projection should run");
4295        assert_eq!(
4296            std::str::from_utf8(&out).unwrap(),
4297            "{\"id\":1,\"name\":\"A\"}\n\
4298{\"id\":2,\"name\":\"B\"}\n\
4299{\"id\":3,\"name\":\"C\"}\n\
4300{\"id\":4,\"name\":\"D\"}\n\
4301{\"id\":5,\"name\":\"E\"}\n\
4302{\"id\":6,\"name\":\"F\"}\n\
4303{\"id\":7,\"name\":\"G\"}\n\
4304{\"id\":8,\"name\":\"H\"}\n\
4305{\"id\":9,\"name\":\"I\"}\n"
4306        );
4307    }
4308
4309    #[test]
4310    #[cfg(feature = "simd-json")]
4311    fn run_ndjson_stream_collect_survives_hint_activation() {
4312        let engine = crate::JetroEngine::new();
4313        let rows = std::io::Cursor::new(
4314            br#"{"id":1,"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4315{"id":2,"attributes":[{"key":"c","value":"z"}]}
4316{"id":3,"attributes":[{"key":"d","value":"w"}]}
4317"#,
4318        );
4319        let mut out = Vec::new();
4320        engine
4321            .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4322            .expect("hinted stream collect should run");
4323        assert_eq!(
4324            std::str::from_utf8(&out).unwrap(),
4325            "[[\"a\",\"x\"],[\"b\",\"y\"]]\n[[\"c\",\"z\"]]\n[[\"d\",\"w\"]]\n"
4326        );
4327    }
4328
4329    #[test]
4330    #[cfg(feature = "simd-json")]
4331    fn run_ndjson_stream_cache_rejects_reordered_item_prefixes() {
4332        let engine = crate::JetroEngine::new();
4333        let rows = std::io::Cursor::new(
4334            br#"{"attributes":[{"key":"k1","value":"a"}]}
4335{"attributes":[{"value":"k1","key":"actual"}]}
4336"#,
4337        );
4338        let mut out = Vec::new();
4339        engine
4340            .run_ndjson(rows, "$.attributes.map(@.key)", &mut out)
4341            .expect("stream cache should fall back on reordered item fields");
4342        assert_eq!(
4343            std::str::from_utf8(&out).unwrap(),
4344            "[\"k1\"]\n[\"actual\"]\n"
4345        );
4346    }
4347
4348    #[test]
4349    #[cfg(feature = "simd-json")]
4350    fn run_ndjson_stream_map_preserves_missing_field_nulls() {
4351        let engine = crate::JetroEngine::new();
4352        let rows = std::io::Cursor::new(
4353            br#"{"attributes":[{"key":"a","value":"x"},{"key":"b"}]}
4354{"attributes":[{"value":"z"}]}
4355"#,
4356        );
4357        let mut out = Vec::new();
4358        engine
4359            .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4360            .expect("stream map should preserve nulls for missing fields");
4361        assert_eq!(
4362            std::str::from_utf8(&out).unwrap(),
4363            "[[\"a\",\"x\"],[\"b\",null]]\n[[null,\"z\"]]\n"
4364        );
4365    }
4366
4367    #[test]
4368    #[cfg(feature = "simd-json")]
4369    fn run_ndjson_stream_object_map_preserves_scalar_calls() {
4370        let engine = crate::JetroEngine::new();
4371        let rows = std::io::Cursor::new(
4372            br#"{"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4373"#,
4374        );
4375        let mut out = Vec::new();
4376        engine
4377            .run_ndjson(
4378                rows,
4379                "$.attributes.map({k: @.key, v: @.value.upper()})",
4380                &mut out,
4381            )
4382            .expect("stream object map should preserve scalar calls");
4383        assert_eq!(
4384            std::str::from_utf8(&out).unwrap(),
4385            "[{\"k\":\"a\",\"v\":\"X\"},{\"k\":\"b\",\"v\":\"Y\"}]\n"
4386        );
4387    }
4388
4389    #[test]
4390    #[cfg(feature = "simd-json")]
4391    fn run_ndjson_stream_map_projects_nested_item_paths() {
4392        let engine = crate::JetroEngine::new();
4393        let rows = std::io::Cursor::new(
4394            br#"{"attributes":[{"key":"a","meta":{"code":"x"}},{"key":"b","meta":{"code":"y"}}]}
4395"#,
4396        );
4397        let mut out = Vec::new();
4398        engine
4399            .run_ndjson(
4400                rows,
4401                "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4402                &mut out,
4403            )
4404            .expect("stream map should project nested item paths");
4405        assert_eq!(
4406            std::str::from_utf8(&out).unwrap(),
4407            "[{\"k\":\"a\",\"code\":\"X\"},{\"k\":\"b\",\"code\":\"Y\"}]\n"
4408        );
4409    }
4410
4411    #[test]
4412    #[cfg(feature = "simd-json")]
4413    fn run_ndjson_stream_count_survives_hint_activation() {
4414        let engine = crate::JetroEngine::new();
4415        let rows = std::io::Cursor::new(
4416            br#"{"id":1,"attributes":[{"value":"x_3"},{"value":"y"}]}
4417{"id":2,"attributes":[{"value":"z_3"},{"value":"w_3"}]}
4418{"id":3,"attributes":[{"value":"n"}]}
4419"#,
4420        );
4421        let mut out = Vec::new();
4422        engine
4423            .run_ndjson(
4424                rows,
4425                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4426                &mut out,
4427            )
4428            .expect("hinted stream count should run");
4429        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n0\n");
4430    }
4431
4432    #[test]
4433    #[cfg(feature = "simd-json")]
4434    fn run_ndjson_filtered_count_ignores_missing_predicate_fields() {
4435        let engine = crate::JetroEngine::new();
4436        let rows = std::io::Cursor::new(
4437            br#"{"attributes":[{"value":"x_3"},{"key":"missing"},{"value":"y"}]}
4438{"attributes":[{"key":"missing"}]}
4439"#,
4440        );
4441        let mut out = Vec::new();
4442        engine
4443            .run_ndjson(
4444                rows,
4445                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4446                &mut out,
4447            )
4448            .expect("filtered count should ignore missing predicate fields");
4449        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n0\n");
4450    }
4451
4452    #[test]
4453    fn run_ndjson_filter_last_returns_last_matching_output() {
4454        let engine = crate::JetroEngine::new();
4455        let rows = std::io::Cursor::new(
4456            br#"{"attributes":[{"key":"keep","value":"first"},{"key":"drop","value":"physical-last"}]}
4457{"attributes":[{"key":"drop","value":"first"},{"key":"keep","value":"semantic-last"},{"key":"drop","value":"physical-last"}]}
4458"#,
4459        );
4460        let mut out = Vec::new();
4461
4462        engine
4463            .run_ndjson(
4464                rows,
4465                r#"$.attributes.filter(@.key == "keep").last().value"#,
4466                &mut out,
4467            )
4468            .expect("filtered last should preserve semantic output order");
4469
4470        assert_eq!(
4471            std::str::from_utf8(&out).unwrap(),
4472            "\"first\"\n\"semantic-last\"\n"
4473        );
4474    }
4475
4476    #[test]
4477    fn run_ndjson_filter_map_first_stops_at_first_matching_output() {
4478        let engine = crate::JetroEngine::new();
4479        let rows = std::io::Cursor::new(
4480            br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4481{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4482{"attributes":[{"key":"a","value":"skip"}]}
4483"#,
4484        );
4485        let mut out = Vec::new();
4486
4487        engine
4488            .run_ndjson(
4489                rows,
4490                r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4491                &mut out,
4492            )
4493            .expect("filtered first should use direct stream first");
4494
4495        assert_eq!(
4496            std::str::from_utf8(&out).unwrap(),
4497            "{\"key\":\"a\",\"value\":\"x_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4498        );
4499    }
4500
4501    #[test]
4502    fn run_ndjson_map_first_projects_first_item_without_filter() {
4503        let engine = crate::JetroEngine::new();
4504        let rows = std::io::Cursor::new(
4505            br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"later"}]}
4506{"attributes":[]}
4507{"attributes":[{"key":"c","value":"only"}]}
4508"#,
4509        );
4510        let mut out = Vec::new();
4511
4512        engine
4513            .run_ndjson(rows, "$.attributes.map(@.value).first()", &mut out)
4514            .expect("unfiltered first should use direct stream first");
4515
4516        assert_eq!(
4517            std::str::from_utf8(&out).unwrap(),
4518            "\"first\"\n\"only\"\n"
4519        );
4520    }
4521
4522    #[test]
4523    fn run_ndjson_map_last_projects_last_item_without_filter() {
4524        let engine = crate::JetroEngine::new();
4525        let rows = std::io::Cursor::new(
4526            br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"last"}]}
4527{"attributes":[]}
4528{"attributes":[{"key":"c","value":"only"}]}
4529"#,
4530        );
4531        let mut out = Vec::new();
4532
4533        engine
4534            .run_ndjson(rows, "$.attributes.map(@.value).last()", &mut out)
4535            .expect("unfiltered last should use direct stream last");
4536
4537        assert_eq!(
4538            std::str::from_utf8(&out).unwrap(),
4539            "\"last\"\n\"only\"\n"
4540        );
4541    }
4542
4543    #[test]
4544    fn run_ndjson_filter_map_last_keeps_latest_matching_output() {
4545        let engine = crate::JetroEngine::new();
4546        let rows = std::io::Cursor::new(
4547            br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4548{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4549{"attributes":[{"key":"a","value":"skip"}]}
4550"#,
4551        );
4552        let mut out = Vec::new();
4553
4554        engine
4555            .run_ndjson(
4556                rows,
4557                r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).last()"#,
4558                &mut out,
4559            )
4560            .expect("filtered last should use direct stream last");
4561
4562        assert_eq!(
4563            std::str::from_utf8(&out).unwrap(),
4564            "{\"key\":\"b\",\"value\":\"later_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4565        );
4566    }
4567
4568    #[test]
4569    #[cfg(feature = "simd-json")]
4570    fn run_ndjson_stream_numeric_survives_hint_activation() {
4571        let engine = crate::JetroEngine::new();
4572        let rows = std::io::Cursor::new(
4573            br#"{"id":1,"attributes":[{"weight":1},{"weight":2}]}
4574{"id":2,"attributes":[{"weight":3.5},{"weight":4}]}
4575{"id":3,"attributes":[{"weight":"skip"}]}
4576"#,
4577        );
4578        let mut out = Vec::new();
4579        engine
4580            .run_ndjson(rows, "$.attributes.map(@.weight).sum()", &mut out)
4581            .expect("hinted numeric stream should run");
4582        assert_eq!(std::str::from_utf8(&out).unwrap(), "3\n7.5\n0\n");
4583    }
4584
4585    #[test]
4586    #[cfg(feature = "simd-json")]
4587    fn run_ndjson_filtered_stream_numeric_uses_shared_fields() {
4588        let engine = crate::JetroEngine::new();
4589        let rows = std::io::Cursor::new(
4590            br#"{"attributes":[{"value":"x_3","weight":1},{"value":"skip","weight":10},{"value":"y_3","weight":2.5}]}
4591{"attributes":[{"value":"skip","weight":4}]}
4592"#,
4593        );
4594        let mut out = Vec::new();
4595        engine
4596            .run_ndjson(
4597                rows,
4598                r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4599                &mut out,
4600            )
4601            .expect("filtered numeric stream should use byte path");
4602        assert_eq!(std::str::from_utf8(&out).unwrap(), "3.5\n0\n");
4603    }
4604
4605    #[test]
4606    #[cfg(feature = "simd-json")]
4607    fn run_ndjson_stream_extreme_projects_selected_item_field() {
4608        let engine = crate::JetroEngine::new();
4609        let rows = std::io::Cursor::new(
4610            br#"{"attributes":[{"key":"a","value":"m"},{"key":"b","value":"z"},{"key":"c","value":"n"}]}
4611{"attributes":[{"key":"x","value":"b"},{"key":"y","value":"a"}]}
4612"#,
4613        );
4614        let mut out = Vec::new();
4615        engine
4616            .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4617            .expect("stream extreme should project selected item field");
4618        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"x\"\n");
4619    }
4620
4621    #[test]
4622    fn run_ndjson_stream_extreme_handles_escaped_string_keys() {
4623        let engine = crate::JetroEngine::new();
4624        let rows = std::io::Cursor::new(
4625            br#"{"attributes":[{"key":"a","value":"v\"1"},{"key":"b","value":"v_9"}]}
4626"#,
4627        );
4628        let mut out = Vec::new();
4629        engine
4630            .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4631            .expect("escaped extrema keys should fall back safely");
4632        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n");
4633    }
4634
4635    #[test]
4636    fn run_ndjson_stream_extreme_handles_numeric_keys() {
4637        let engine = crate::JetroEngine::new();
4638        let rows = std::io::Cursor::new(
4639            br#"{"attributes":[{"key":"a","score":1},{"key":"b","score":10},{"key":"c","score":2}]}
4640{"attributes":[{"key":"x","score":-2},{"key":"y","score":-1.5}]}
4641"#,
4642        );
4643        let mut out = Vec::new();
4644        engine
4645            .run_ndjson(rows, "$.attributes.sort_by(@.score).last().key", &mut out)
4646            .expect("numeric extrema keys should use direct stream extrema");
4647        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"y\"\n");
4648    }
4649
4650    #[test]
4651    #[cfg(feature = "simd-json")]
4652    fn run_ndjson_nested_direct_projection_writes_without_fallback() {
4653        let engine = crate::JetroEngine::new();
4654        let rows = std::io::Cursor::new(
4655            br#"{"id":1,"name":"ada","attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4656{"id":2,"name":"bob","attributes":[{"key":"c","value":"z"}]}
4657"#,
4658        );
4659        let mut out = Vec::new();
4660        engine
4661            .run_ndjson(
4662                rows,
4663                r#"{id: $.id, name: $.name, count: $.attributes.len(), first: $.attributes.first().value, values: $.attributes.map(@.value)}"#,
4664                &mut out,
4665            )
4666            .expect("nested direct projection should run");
4667        assert_eq!(
4668            std::str::from_utf8(&out).unwrap(),
4669            "{\"id\":1,\"name\":\"ada\",\"count\":2,\"first\":\"x\",\"values\":[\"x\",\"y\"]}\n\
4670{\"id\":2,\"name\":\"bob\",\"count\":1,\"first\":\"z\",\"values\":[\"z\"]}\n"
4671        );
4672    }
4673}