Skip to main content

jetro_core/io/
ndjson.rs

1use super::ndjson_byte::{
2    eval_ndjson_byte_predicate_row, tape_plan_can_write_byte_row, write_ndjson_byte_plan_row,
3    write_ndjson_byte_tape_plan_row, write_ndjson_hinted_tape_plan_row, BytePlanWrite,
4};
5#[cfg(test)]
6pub(super) use super::ndjson_direct::{
7    direct_byte_plan, direct_writer_plan_kind, NdjsonDirectPlanKind,
8};
9pub(super) use super::ndjson_direct::{
10    direct_tape_plan, direct_tape_predicate, direct_writer_plans, NdjsonDirectBytePlan,
11    NdjsonDirectElement, NdjsonDirectItemPredicate, NdjsonDirectPredicate,
12    NdjsonDirectProjectionValue, NdjsonDirectStreamMap, NdjsonDirectStreamPlan,
13    NdjsonDirectStreamSink, NdjsonDirectTapePlan,
14};
15use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
16use super::ndjson_hint::{
17    NdjsonHintAccessPlan, NdjsonHintConfig, NdjsonHintDecision, NdjsonHintState,
18};
19pub(super) use super::ndjson_row::{collect_row_val, parse_row, row_eval_error, row_parse_error};
20use super::ndjson_rows::NdjsonRowsFilePlan;
21use super::ndjson_route::{
22    ndjson_route_plan, NdjsonExecutionReport, NdjsonExecutionStats, NdjsonRouteExplain,
23    NdjsonRoutePlan, NdjsonSourceCaps, NdjsonSourceMode,
24};
25use super::ndjson_stream_cache::NdjsonConstantStreamCache;
26pub(super) use super::ndjson_write::{
27    ndjson_writer_with_options, write_json_bytes_line_with_options, write_val_line,
28    write_val_line_with_options,
29};
30use super::stream_exec::CompiledRowStream;
31use super::stream_fanout::{
32    drive_ndjson_rows_fanout_file, drive_ndjson_rows_fanout_file_with_stats,
33};
34#[cfg(test)]
35use super::stream_plan::RowStreamSourceKind;
36use super::stream_plan::{RowStreamDirection, RowStreamPlan};
37use super::stream_subquery::{RowStreamSubqueryPlan, STREAM_BINDING};
38use super::stream_types::{RowStreamRowResult, RowStreamStats};
39use super::{NdjsonSource, RowError};
40pub use super::ndjson_driver::NdjsonPerRowDriver;
41use crate::compile::compiler::Compiler;
42use crate::data::context::Env;
43use crate::data::value::Val;
44use crate::plan::physical::PlanningContext;
45use crate::util::is_truthy;
46use crate::{EvalError, Jetro, JetroEngine, JetroEngineError, VM};
47use memchr::memchr;
48use serde_json::Value;
49use std::fs::File;
50use std::io::{BufRead, Write};
51use std::path::Path;
52use std::sync::MutexGuard;
53
54pub(super) const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
55const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
56pub(super) const DEFAULT_READER_BUFFER_CAPACITY: usize = 1024 * 1024;
57pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
58
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
60pub enum NdjsonWriterPathKind {
61    ByteExpr,
62    ByteWritableTape,
63    Tape,
64}
65
66impl std::fmt::Display for NdjsonWriterPathKind {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.write_str(match self {
69            Self::ByteExpr => "byte-expr",
70            Self::ByteWritableTape => "byte-writable-tape",
71            Self::Tape => "tape",
72        })
73    }
74}
75
76#[cfg(test)]
77pub(super) fn direct_writer_path_kind(
78    engine: &JetroEngine,
79    query: &str,
80) -> Option<NdjsonWriterPathKind> {
81    ndjson_writer_path_kind(engine, query)
82}
83
84pub fn ndjson_writer_path_kind(
85    engine: &JetroEngine,
86    query: &str,
87) -> Option<NdjsonWriterPathKind> {
88    let (byte, tape) = direct_writer_plans(engine, query)?;
89    if byte.is_some() {
90        return Some(NdjsonWriterPathKind::ByteExpr);
91    }
92    if tape_plan_can_write_byte_row(&tape) {
93        return Some(NdjsonWriterPathKind::ByteWritableTape);
94    }
95    Some(NdjsonWriterPathKind::Tape)
96}
97
98/// Configuration for per-row NDJSON execution.
99#[derive(Clone, Copy, Debug, PartialEq, Eq)]
100pub struct NdjsonOptions {
101    pub max_line_len: usize,
102    pub initial_buffer_capacity: usize,
103    pub reader_buffer_capacity: usize,
104    pub reverse_chunk_size: usize,
105    pub parallel_min_bytes: u64,
106    pub parallelism: NdjsonParallelism,
107    pub row_frame: NdjsonRowFrame,
108    pub null_output: NdjsonNullOutput,
109}
110
111#[derive(Clone, Copy, Debug, Eq, PartialEq)]
112pub enum NdjsonParallelism {
113    Auto,
114    Off,
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq)]
118pub enum NdjsonNullOutput {
119    Skip,
120    Emit,
121}
122
123impl Default for NdjsonOptions {
124    fn default() -> Self {
125        Self {
126            max_line_len: DEFAULT_MAX_LINE_LEN,
127            initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
128            reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
129            reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
130            parallel_min_bytes: 64 * 1024 * 1024,
131            parallelism: NdjsonParallelism::Auto,
132            row_frame: NdjsonRowFrame::JsonLine,
133            null_output: NdjsonNullOutput::Skip,
134        }
135    }
136}
137
138impl NdjsonOptions {
139    pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
140        self.max_line_len = max_line_len;
141        self
142    }
143
144    pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
145        self.initial_buffer_capacity = capacity;
146        self
147    }
148
149    pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
150        self.reader_buffer_capacity = capacity;
151        self
152    }
153
154    pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
155        self.reverse_chunk_size = capacity;
156        self
157    }
158
159    pub fn with_parallel_min_bytes(mut self, bytes: u64) -> Self {
160        self.parallel_min_bytes = bytes;
161        self
162    }
163
164    pub fn with_parallelism(mut self, parallelism: NdjsonParallelism) -> Self {
165        self.parallelism = parallelism;
166        self
167    }
168
169    pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
170        self.row_frame = row_frame;
171        self
172    }
173
174    pub fn with_null_output(mut self, null_output: NdjsonNullOutput) -> Self {
175        self.null_output = null_output;
176        self
177    }
178}
179
180#[derive(Clone, Copy, Debug, Eq, PartialEq)]
181pub enum NdjsonControl {
182    Continue,
183    Stop,
184}
185
186pub fn for_each_ndjson<R, F>(
187    engine: &JetroEngine,
188    reader: R,
189    query: &str,
190    f: F,
191) -> Result<usize, JetroEngineError>
192where
193    R: BufRead,
194    F: FnMut(Value),
195{
196    for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
197}
198
199pub fn for_each_ndjson_with_options<R, F>(
200    engine: &JetroEngine,
201    reader: R,
202    query: &str,
203    options: NdjsonOptions,
204    mut f: F,
205) -> Result<usize, JetroEngineError>
206where
207    R: BufRead,
208    F: FnMut(Value),
209{
210    drive_ndjson(engine, reader, query, options, |value| {
211        f(value);
212        Ok(NdjsonControl::Continue)
213    })
214}
215
216pub fn for_each_ndjson_until<R, F>(
217    engine: &JetroEngine,
218    reader: R,
219    query: &str,
220    f: F,
221) -> Result<usize, JetroEngineError>
222where
223    R: BufRead,
224    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
225{
226    for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
227}
228
229pub fn for_each_ndjson_until_with_options<R, F>(
230    engine: &JetroEngine,
231    reader: R,
232    query: &str,
233    options: NdjsonOptions,
234    f: F,
235) -> Result<usize, JetroEngineError>
236where
237    R: BufRead,
238    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
239{
240    drive_ndjson(engine, reader, query, options, f)
241}
242
243pub fn for_each_ndjson_source<F>(
244    engine: &JetroEngine,
245    source: NdjsonSource,
246    query: &str,
247    f: F,
248) -> Result<usize, JetroEngineError>
249where
250    F: FnMut(Value),
251{
252    for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
253}
254
255pub fn for_each_ndjson_source_with_options<F>(
256    engine: &JetroEngine,
257    source: NdjsonSource,
258    query: &str,
259    options: NdjsonOptions,
260    f: F,
261) -> Result<usize, JetroEngineError>
262where
263    F: FnMut(Value),
264{
265    match source {
266        NdjsonSource::File(path) => {
267            let file = File::open(path)?;
268            for_each_ndjson_with_options(
269                engine,
270                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
271                query,
272                options,
273                f,
274            )
275        }
276        NdjsonSource::Reader(reader) => {
277            for_each_ndjson_with_options(engine, reader, query, options, f)
278        }
279    }
280}
281
282pub fn for_each_ndjson_source_until<F>(
283    engine: &JetroEngine,
284    source: NdjsonSource,
285    query: &str,
286    f: F,
287) -> Result<usize, JetroEngineError>
288where
289    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
290{
291    for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
292}
293
294pub fn for_each_ndjson_source_until_with_options<F>(
295    engine: &JetroEngine,
296    source: NdjsonSource,
297    query: &str,
298    options: NdjsonOptions,
299    f: F,
300) -> Result<usize, JetroEngineError>
301where
302    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
303{
304    match source {
305        NdjsonSource::File(path) => {
306            let file = File::open(path)?;
307            for_each_ndjson_until_with_options(
308                engine,
309                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
310                query,
311                options,
312                f,
313            )
314        }
315        NdjsonSource::Reader(reader) => {
316            for_each_ndjson_until_with_options(engine, reader, query, options, f)
317        }
318    }
319}
320
321pub fn collect_ndjson<R>(
322    engine: &JetroEngine,
323    reader: R,
324    query: &str,
325) -> Result<Vec<Value>, JetroEngineError>
326where
327    R: BufRead,
328{
329    collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
330}
331
332pub fn collect_ndjson_with_options<R>(
333    engine: &JetroEngine,
334    reader: R,
335    query: &str,
336    options: NdjsonOptions,
337) -> Result<Vec<Value>, JetroEngineError>
338where
339    R: BufRead,
340{
341    let mut values = Vec::new();
342    for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
343    Ok(values)
344}
345
346pub fn collect_ndjson_file<P>(
347    engine: &JetroEngine,
348    path: P,
349    query: &str,
350) -> Result<Vec<Value>, JetroEngineError>
351where
352    P: AsRef<Path>,
353{
354    let file = File::open(path)?;
355    let options = NdjsonOptions::default();
356    collect_ndjson_with_options(
357        engine,
358        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
359        query,
360        options,
361    )
362}
363
364pub fn collect_ndjson_file_with_options<P>(
365    engine: &JetroEngine,
366    path: P,
367    query: &str,
368    options: NdjsonOptions,
369) -> Result<Vec<Value>, JetroEngineError>
370where
371    P: AsRef<Path>,
372{
373    let file = File::open(path)?;
374    collect_ndjson_with_options(
375        engine,
376        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
377        query,
378        options,
379    )
380}
381
382pub fn collect_ndjson_source(
383    engine: &JetroEngine,
384    source: NdjsonSource,
385    query: &str,
386) -> Result<Vec<Value>, JetroEngineError> {
387    collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
388}
389
390pub fn collect_ndjson_source_with_options(
391    engine: &JetroEngine,
392    source: NdjsonSource,
393    query: &str,
394    options: NdjsonOptions,
395) -> Result<Vec<Value>, JetroEngineError> {
396    match source {
397        NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
398        NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
399    }
400}
401
402pub fn collect_ndjson_matches<R>(
403    engine: &JetroEngine,
404    reader: R,
405    predicate: &str,
406    limit: usize,
407) -> Result<Vec<Value>, JetroEngineError>
408where
409    R: BufRead,
410{
411    collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
412}
413
414pub fn collect_ndjson_matches_with_options<R>(
415    engine: &JetroEngine,
416    reader: R,
417    predicate: &str,
418    limit: usize,
419    options: NdjsonOptions,
420) -> Result<Vec<Value>, JetroEngineError>
421where
422    R: BufRead,
423{
424    let mut values = Vec::with_capacity(limit);
425    drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
426        values.push(Value::from(value));
427        Ok(NdjsonControl::Continue)
428    })?;
429    Ok(values)
430}
431
432pub fn collect_ndjson_matches_file<P>(
433    engine: &JetroEngine,
434    path: P,
435    predicate: &str,
436    limit: usize,
437) -> Result<Vec<Value>, JetroEngineError>
438where
439    P: AsRef<Path>,
440{
441    let file = File::open(path)?;
442    let options = NdjsonOptions::default();
443    collect_ndjson_matches_with_options(
444        engine,
445        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
446        predicate,
447        limit,
448        options,
449    )
450}
451
452pub fn collect_ndjson_matches_file_with_options<P>(
453    engine: &JetroEngine,
454    path: P,
455    predicate: &str,
456    limit: usize,
457    options: NdjsonOptions,
458) -> Result<Vec<Value>, JetroEngineError>
459where
460    P: AsRef<Path>,
461{
462    let file = File::open(path)?;
463    collect_ndjson_matches_with_options(
464        engine,
465        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
466        predicate,
467        limit,
468        options,
469    )
470}
471
472pub fn collect_ndjson_matches_source(
473    engine: &JetroEngine,
474    source: NdjsonSource,
475    predicate: &str,
476    limit: usize,
477) -> Result<Vec<Value>, JetroEngineError> {
478    collect_ndjson_matches_source_with_options(
479        engine,
480        source,
481        predicate,
482        limit,
483        NdjsonOptions::default(),
484    )
485}
486
487pub fn collect_ndjson_matches_source_with_options(
488    engine: &JetroEngine,
489    source: NdjsonSource,
490    predicate: &str,
491    limit: usize,
492    options: NdjsonOptions,
493) -> Result<Vec<Value>, JetroEngineError> {
494    match source {
495        NdjsonSource::File(path) => {
496            collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
497        }
498        NdjsonSource::Reader(reader) => {
499            collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
500        }
501    }
502}
503
504pub fn run_ndjson<R, W>(
505    engine: &JetroEngine,
506    reader: R,
507    query: &str,
508    writer: W,
509) -> Result<usize, JetroEngineError>
510where
511    R: BufRead,
512    W: Write,
513{
514    run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
515}
516
517pub fn run_ndjson_file<P, W>(
518    engine: &JetroEngine,
519    path: P,
520    query: &str,
521    writer: W,
522) -> Result<usize, JetroEngineError>
523where
524    P: AsRef<Path>,
525    W: Write,
526{
527    let options = NdjsonOptions::default();
528    run_ndjson_file_with_options(engine, path, query, writer, options)
529}
530
531pub fn run_ndjson_file_with_options<P, W>(
532    engine: &JetroEngine,
533    path: P,
534    query: &str,
535    writer: W,
536    options: NdjsonOptions,
537) -> Result<usize, JetroEngineError>
538where
539    P: AsRef<Path>,
540    W: Write,
541{
542    let path = path.as_ref();
543    match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
544        NdjsonRoutePlan::Rows { plan, .. } => {
545            return drive_ndjson_rows_file_plan(engine, path, &plan, None, options, writer);
546        }
547        NdjsonRoutePlan::Unsupported { explain } => {
548            return Err(unsupported_ndjson_route_error(&explain));
549        }
550        NdjsonRoutePlan::RowLocal { .. } => {}
551    }
552
553    let file = File::open(path)?;
554    run_ndjson_with_options(
555        engine,
556        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
557        query,
558        writer,
559        options,
560    )
561}
562
563pub fn run_ndjson_file_with_report<P, W>(
564    engine: &JetroEngine,
565    path: P,
566    query: &str,
567    writer: W,
568) -> Result<NdjsonExecutionReport, JetroEngineError>
569where
570    P: AsRef<Path>,
571    W: Write,
572{
573    run_ndjson_file_with_report_and_options(engine, path, query, writer, NdjsonOptions::default())
574}
575
576pub fn run_ndjson_file_with_report_and_options<P, W>(
577    engine: &JetroEngine,
578    path: P,
579    query: &str,
580    writer: W,
581    options: NdjsonOptions,
582) -> Result<NdjsonExecutionReport, JetroEngineError>
583where
584    P: AsRef<Path>,
585    W: Write,
586{
587    let path = path.as_ref();
588    match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
589        NdjsonRoutePlan::Rows {
590            explain,
591            plan: NdjsonRowsFilePlan::Stream(plan),
592        } => {
593            let (_, stats) =
594                drive_ndjson_rows_stream_file_with_stats(engine, path, &plan, None, options, writer)?;
595            Ok(row_stream_report(explain, stats))
596        }
597        NdjsonRoutePlan::Rows {
598            explain,
599            plan: NdjsonRowsFilePlan::Fanout(plan),
600        } => {
601            let (_, stats) =
602                drive_ndjson_rows_fanout_file_with_stats(engine, path, &plan, options, writer)?;
603            Ok(row_stream_report(explain, stats))
604        }
605        NdjsonRoutePlan::Rows {
606            explain,
607            plan: NdjsonRowsFilePlan::Subquery(plan),
608        } => {
609            let (_, stats) =
610                drive_ndjson_rows_subquery_file_with_stats(engine, path, &plan, options, writer)?;
611            Ok(row_stream_report(explain, stats))
612        }
613        NdjsonRoutePlan::Unsupported { explain } => Err(unsupported_ndjson_route_error(&explain)),
614        NdjsonRoutePlan::RowLocal { explain } => {
615            let file = File::open(path)?;
616            let (_, stats) = drive_ndjson_writer_with_stats(
617                engine,
618                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
619                query,
620                None,
621                options,
622                writer,
623            )?;
624            Ok(NdjsonExecutionReport::new(explain, stats))
625        }
626    }
627}
628
629pub fn run_ndjson_with_options<R, W>(
630    engine: &JetroEngine,
631    reader: R,
632    query: &str,
633    writer: W,
634    options: NdjsonOptions,
635) -> Result<usize, JetroEngineError>
636where
637    R: BufRead,
638    W: Write,
639{
640    match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
641        NdjsonRoutePlan::Rows {
642            plan: NdjsonRowsFilePlan::Stream(plan),
643            ..
644        } => drive_ndjson_rows_stream_reader(engine, reader, &plan, None, options, writer),
645        NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
646            Err(unsupported_ndjson_route_error(&explain))
647        }
648        NdjsonRoutePlan::RowLocal { .. } => {
649            drive_ndjson_writer(engine, reader, query, None, options, writer)
650        }
651    }
652}
653
654pub fn run_ndjson_with_report<R, W>(
655    engine: &JetroEngine,
656    reader: R,
657    query: &str,
658    writer: W,
659) -> Result<NdjsonExecutionReport, JetroEngineError>
660where
661    R: BufRead,
662    W: Write,
663{
664    run_ndjson_with_report_and_options(engine, reader, query, writer, NdjsonOptions::default())
665}
666
667pub fn run_ndjson_with_report_and_options<R, W>(
668    engine: &JetroEngine,
669    reader: R,
670    query: &str,
671    writer: W,
672    options: NdjsonOptions,
673) -> Result<NdjsonExecutionReport, JetroEngineError>
674where
675    R: BufRead,
676    W: Write,
677{
678    match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
679        NdjsonRoutePlan::Rows {
680            explain,
681            plan: NdjsonRowsFilePlan::Stream(plan),
682        } => {
683            let (_, stats) =
684                drive_ndjson_rows_stream_reader_with_stats(engine, reader, &plan, None, options, writer)?;
685            Ok(row_stream_report(explain, stats))
686        }
687        NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
688            Err(unsupported_ndjson_route_error(&explain))
689        }
690        NdjsonRoutePlan::RowLocal { explain } => {
691            let (_, stats) =
692                drive_ndjson_writer_with_stats(engine, reader, query, None, options, writer)?;
693            Ok(NdjsonExecutionReport::new(explain, stats))
694        }
695    }
696}
697
698pub fn run_ndjson_limit<R, W>(
699    engine: &JetroEngine,
700    reader: R,
701    query: &str,
702    limit: usize,
703    writer: W,
704) -> Result<usize, JetroEngineError>
705where
706    R: BufRead,
707    W: Write,
708{
709    run_ndjson_limit_with_options(
710        engine,
711        reader,
712        query,
713        limit,
714        writer,
715        NdjsonOptions::default(),
716    )
717}
718
719pub fn run_ndjson_limit_with_options<R, W>(
720    engine: &JetroEngine,
721    reader: R,
722    query: &str,
723    limit: usize,
724    writer: W,
725    options: NdjsonOptions,
726) -> Result<usize, JetroEngineError>
727where
728    R: BufRead,
729    W: Write,
730{
731    if limit == 0 {
732        return Ok(0);
733    }
734
735    match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
736        NdjsonRoutePlan::Rows {
737            plan: NdjsonRowsFilePlan::Stream(plan),
738            ..
739        } => drive_ndjson_rows_stream_reader(engine, reader, &plan, Some(limit), options, writer),
740        NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
741            Err(unsupported_ndjson_route_error(&explain))
742        }
743        NdjsonRoutePlan::RowLocal { .. } => {
744            drive_ndjson_writer(engine, reader, query, Some(limit), options, writer)
745        }
746    }
747}
748
749pub fn run_ndjson_limit_with_report<R, W>(
750    engine: &JetroEngine,
751    reader: R,
752    query: &str,
753    limit: usize,
754    writer: W,
755) -> Result<NdjsonExecutionReport, JetroEngineError>
756where
757    R: BufRead,
758    W: Write,
759{
760    run_ndjson_limit_with_report_and_options(
761        engine,
762        reader,
763        query,
764        limit,
765        writer,
766        NdjsonOptions::default(),
767    )
768}
769
770pub fn run_ndjson_limit_with_report_and_options<R, W>(
771    engine: &JetroEngine,
772    reader: R,
773    query: &str,
774    limit: usize,
775    writer: W,
776    options: NdjsonOptions,
777) -> Result<NdjsonExecutionReport, JetroEngineError>
778where
779    R: BufRead,
780    W: Write,
781{
782    if limit == 0 {
783        let route = ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)?
784            .explain()
785            .clone();
786        return Ok(NdjsonExecutionReport::emitted_only(route, 0));
787    }
788
789    match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
790        NdjsonRoutePlan::Rows {
791            explain,
792            plan: NdjsonRowsFilePlan::Stream(plan),
793        } => {
794            let (_, stats) = drive_ndjson_rows_stream_reader_with_stats(
795                engine,
796                reader,
797                &plan,
798                Some(limit),
799                options,
800                writer,
801            )?;
802            Ok(row_stream_report(explain, stats))
803        }
804        NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
805            Err(unsupported_ndjson_route_error(&explain))
806        }
807        NdjsonRoutePlan::RowLocal { explain } => {
808            let (_, stats) = drive_ndjson_writer_with_stats(
809                engine,
810                reader,
811                query,
812                Some(limit),
813                options,
814                writer,
815            )?;
816            Ok(NdjsonExecutionReport::new(explain, stats))
817        }
818    }
819}
820
821pub fn run_ndjson_file_limit<P, W>(
822    engine: &JetroEngine,
823    path: P,
824    query: &str,
825    limit: usize,
826    writer: W,
827) -> Result<usize, JetroEngineError>
828where
829    P: AsRef<Path>,
830    W: Write,
831{
832    let options = NdjsonOptions::default();
833    run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
834}
835
836pub fn run_ndjson_file_limit_with_options<P, W>(
837    engine: &JetroEngine,
838    path: P,
839    query: &str,
840    limit: usize,
841    writer: W,
842    options: NdjsonOptions,
843) -> Result<usize, JetroEngineError>
844where
845    P: AsRef<Path>,
846    W: Write,
847{
848    if limit == 0 {
849        return Ok(0);
850    }
851    let path = path.as_ref();
852    match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
853        NdjsonRoutePlan::Rows { plan, .. } => {
854            return drive_ndjson_rows_file_plan(engine, path, &plan, Some(limit), options, writer);
855        }
856        NdjsonRoutePlan::Unsupported { explain } => {
857            return Err(unsupported_ndjson_route_error(&explain));
858        }
859        NdjsonRoutePlan::RowLocal { .. } => {}
860    }
861
862    let file = File::open(path)?;
863    run_ndjson_limit_with_options(
864        engine,
865        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
866        query,
867        limit,
868        writer,
869        options,
870    )
871}
872
873pub fn run_ndjson_file_limit_with_report<P, W>(
874    engine: &JetroEngine,
875    path: P,
876    query: &str,
877    limit: usize,
878    writer: W,
879) -> Result<NdjsonExecutionReport, JetroEngineError>
880where
881    P: AsRef<Path>,
882    W: Write,
883{
884    run_ndjson_file_limit_with_report_and_options(
885        engine,
886        path,
887        query,
888        limit,
889        writer,
890        NdjsonOptions::default(),
891    )
892}
893
894pub fn run_ndjson_file_limit_with_report_and_options<P, W>(
895    engine: &JetroEngine,
896    path: P,
897    query: &str,
898    limit: usize,
899    writer: W,
900    options: NdjsonOptions,
901) -> Result<NdjsonExecutionReport, JetroEngineError>
902where
903    P: AsRef<Path>,
904    W: Write,
905{
906    if limit == 0 {
907        let route = ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)?
908            .explain()
909            .clone();
910        return Ok(NdjsonExecutionReport::emitted_only(route, 0));
911    }
912
913    let path = path.as_ref();
914    match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
915        NdjsonRoutePlan::Rows {
916            explain,
917            plan: NdjsonRowsFilePlan::Stream(plan),
918        } => {
919            let (_, stats) = drive_ndjson_rows_stream_file_with_stats(
920                engine,
921                path,
922                &plan,
923                Some(limit),
924                options,
925                writer,
926            )?;
927            Ok(row_stream_report(explain, stats))
928        }
929        NdjsonRoutePlan::Rows {
930            explain,
931            plan: NdjsonRowsFilePlan::Fanout(plan),
932        } => {
933            let (_, stats) =
934                drive_ndjson_rows_fanout_file_with_stats(engine, path, &plan, options, writer)?;
935            Ok(row_stream_report(explain, stats))
936        }
937        NdjsonRoutePlan::Rows {
938            explain,
939            plan: NdjsonRowsFilePlan::Subquery(plan),
940        } => {
941            let (_, stats) =
942                drive_ndjson_rows_subquery_file_with_stats(engine, path, &plan, options, writer)?;
943            Ok(row_stream_report(explain, stats))
944        }
945        NdjsonRoutePlan::Unsupported { explain } => Err(unsupported_ndjson_route_error(&explain)),
946        NdjsonRoutePlan::RowLocal { explain } => {
947            let file = File::open(path)?;
948            let (_, stats) = drive_ndjson_writer_with_stats(
949                engine,
950                std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
951                query,
952                Some(limit),
953                options,
954                writer,
955            )?;
956            Ok(NdjsonExecutionReport::new(explain, stats))
957        }
958    }
959}
960
961fn unsupported_ndjson_route_error(explain: &NdjsonRouteExplain) -> JetroEngineError {
962    let message = explain
963        .unsupported_message()
964        .unwrap_or_else(|| "unsupported NDJSON route".to_string());
965    JetroEngineError::Eval(EvalError(message))
966}
967
968fn row_stream_report(explain: NdjsonRouteExplain, stats: RowStreamStats) -> NdjsonExecutionReport {
969    NdjsonExecutionReport::new(explain, NdjsonExecutionStats::from(&stats))
970}
971
972pub fn run_ndjson_source<W>(
973    engine: &JetroEngine,
974    source: NdjsonSource,
975    query: &str,
976    writer: W,
977) -> Result<usize, JetroEngineError>
978where
979    W: Write,
980{
981    run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
982}
983
984pub fn run_ndjson_source_with_options<W>(
985    engine: &JetroEngine,
986    source: NdjsonSource,
987    query: &str,
988    writer: W,
989    options: NdjsonOptions,
990) -> Result<usize, JetroEngineError>
991where
992    W: Write,
993{
994    match source {
995        NdjsonSource::File(path) => {
996            run_ndjson_file_with_options(engine, path, query, writer, options)
997        }
998        NdjsonSource::Reader(reader) => {
999            run_ndjson_with_options(engine, reader, query, writer, options)
1000        }
1001    }
1002}
1003
1004pub fn run_ndjson_source_with_report<W>(
1005    engine: &JetroEngine,
1006    source: NdjsonSource,
1007    query: &str,
1008    writer: W,
1009) -> Result<NdjsonExecutionReport, JetroEngineError>
1010where
1011    W: Write,
1012{
1013    run_ndjson_source_with_report_and_options(engine, source, query, writer, NdjsonOptions::default())
1014}
1015
1016pub fn run_ndjson_source_with_report_and_options<W>(
1017    engine: &JetroEngine,
1018    source: NdjsonSource,
1019    query: &str,
1020    writer: W,
1021    options: NdjsonOptions,
1022) -> Result<NdjsonExecutionReport, JetroEngineError>
1023where
1024    W: Write,
1025{
1026    match source {
1027        NdjsonSource::File(path) => {
1028            run_ndjson_file_with_report_and_options(engine, path, query, writer, options)
1029        }
1030        NdjsonSource::Reader(reader) => {
1031            run_ndjson_with_report_and_options(engine, reader, query, writer, options)
1032        }
1033    }
1034}
1035
1036pub fn run_ndjson_source_limit<W>(
1037    engine: &JetroEngine,
1038    source: NdjsonSource,
1039    query: &str,
1040    limit: usize,
1041    writer: W,
1042) -> Result<usize, JetroEngineError>
1043where
1044    W: Write,
1045{
1046    run_ndjson_source_limit_with_options(
1047        engine,
1048        source,
1049        query,
1050        limit,
1051        writer,
1052        NdjsonOptions::default(),
1053    )
1054}
1055
1056pub fn run_ndjson_source_limit_with_options<W>(
1057    engine: &JetroEngine,
1058    source: NdjsonSource,
1059    query: &str,
1060    limit: usize,
1061    writer: W,
1062    options: NdjsonOptions,
1063) -> Result<usize, JetroEngineError>
1064where
1065    W: Write,
1066{
1067    match source {
1068        NdjsonSource::File(path) => {
1069            run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
1070        }
1071        NdjsonSource::Reader(reader) => {
1072            run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
1073        }
1074    }
1075}
1076
1077pub fn run_ndjson_source_limit_with_report<W>(
1078    engine: &JetroEngine,
1079    source: NdjsonSource,
1080    query: &str,
1081    limit: usize,
1082    writer: W,
1083) -> Result<NdjsonExecutionReport, JetroEngineError>
1084where
1085    W: Write,
1086{
1087    run_ndjson_source_limit_with_report_and_options(
1088        engine,
1089        source,
1090        query,
1091        limit,
1092        writer,
1093        NdjsonOptions::default(),
1094    )
1095}
1096
1097pub fn run_ndjson_source_limit_with_report_and_options<W>(
1098    engine: &JetroEngine,
1099    source: NdjsonSource,
1100    query: &str,
1101    limit: usize,
1102    writer: W,
1103    options: NdjsonOptions,
1104) -> Result<NdjsonExecutionReport, JetroEngineError>
1105where
1106    W: Write,
1107{
1108    match source {
1109        NdjsonSource::File(path) => {
1110            run_ndjson_file_limit_with_report_and_options(engine, path, query, limit, writer, options)
1111        }
1112        NdjsonSource::Reader(reader) => {
1113            run_ndjson_limit_with_report_and_options(engine, reader, query, limit, writer, options)
1114        }
1115    }
1116}
1117
1118pub fn run_ndjson_matches<R, W>(
1119    engine: &JetroEngine,
1120    reader: R,
1121    predicate: &str,
1122    limit: usize,
1123    writer: W,
1124) -> Result<usize, JetroEngineError>
1125where
1126    R: BufRead,
1127    W: Write,
1128{
1129    run_ndjson_matches_with_options(
1130        engine,
1131        reader,
1132        predicate,
1133        limit,
1134        writer,
1135        NdjsonOptions::default(),
1136    )
1137}
1138
1139pub fn run_ndjson_matches_with_options<R, W>(
1140    engine: &JetroEngine,
1141    reader: R,
1142    predicate: &str,
1143    limit: usize,
1144    writer: W,
1145    options: NdjsonOptions,
1146) -> Result<usize, JetroEngineError>
1147where
1148    R: BufRead,
1149    W: Write,
1150{
1151    drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
1152}
1153
1154pub fn run_ndjson_matches_with_report<R, W>(
1155    engine: &JetroEngine,
1156    reader: R,
1157    predicate: &str,
1158    limit: usize,
1159    writer: W,
1160) -> Result<NdjsonExecutionReport, JetroEngineError>
1161where
1162    R: BufRead,
1163    W: Write,
1164{
1165    run_ndjson_matches_with_report_and_options(
1166        engine,
1167        reader,
1168        predicate,
1169        limit,
1170        writer,
1171        NdjsonOptions::default(),
1172    )
1173}
1174
1175pub fn run_ndjson_matches_with_report_and_options<R, W>(
1176    engine: &JetroEngine,
1177    reader: R,
1178    predicate: &str,
1179    limit: usize,
1180    writer: W,
1181    options: NdjsonOptions,
1182) -> Result<NdjsonExecutionReport, JetroEngineError>
1183where
1184    R: BufRead,
1185    W: Write,
1186{
1187    let (_, stats) =
1188        drive_ndjson_matches_writer_with_stats(engine, reader, predicate, limit, options, writer)?;
1189    Ok(NdjsonExecutionReport::new(
1190        NdjsonRouteExplain::matches(NdjsonSourceCaps::reader(options)),
1191        stats,
1192    ))
1193}
1194
1195pub fn run_ndjson_matches_file<P, W>(
1196    engine: &JetroEngine,
1197    path: P,
1198    predicate: &str,
1199    limit: usize,
1200    writer: W,
1201) -> Result<usize, JetroEngineError>
1202where
1203    P: AsRef<Path>,
1204    W: Write,
1205{
1206    let file = File::open(path)?;
1207    let options = NdjsonOptions::default();
1208    run_ndjson_matches_with_options(
1209        engine,
1210        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1211        predicate,
1212        limit,
1213        writer,
1214        options,
1215    )
1216}
1217
1218pub fn run_ndjson_matches_file_with_options<P, W>(
1219    engine: &JetroEngine,
1220    path: P,
1221    predicate: &str,
1222    limit: usize,
1223    writer: W,
1224    options: NdjsonOptions,
1225) -> Result<usize, JetroEngineError>
1226where
1227    P: AsRef<Path>,
1228    W: Write,
1229{
1230    let file = File::open(path)?;
1231    run_ndjson_matches_with_options(
1232        engine,
1233        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1234        predicate,
1235        limit,
1236        writer,
1237        options,
1238    )
1239}
1240
1241pub fn run_ndjson_matches_file_with_report<P, W>(
1242    engine: &JetroEngine,
1243    path: P,
1244    predicate: &str,
1245    limit: usize,
1246    writer: W,
1247) -> Result<NdjsonExecutionReport, JetroEngineError>
1248where
1249    P: AsRef<Path>,
1250    W: Write,
1251{
1252    run_ndjson_matches_file_with_report_and_options(
1253        engine,
1254        path,
1255        predicate,
1256        limit,
1257        writer,
1258        NdjsonOptions::default(),
1259    )
1260}
1261
1262pub fn run_ndjson_matches_file_with_report_and_options<P, W>(
1263    engine: &JetroEngine,
1264    path: P,
1265    predicate: &str,
1266    limit: usize,
1267    writer: W,
1268    options: NdjsonOptions,
1269) -> Result<NdjsonExecutionReport, JetroEngineError>
1270where
1271    P: AsRef<Path>,
1272    W: Write,
1273{
1274    let file = File::open(path)?;
1275    let (_, stats) = drive_ndjson_matches_writer_with_stats(
1276        engine,
1277        std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1278        predicate,
1279        limit,
1280        options,
1281        writer,
1282    )?;
1283    Ok(NdjsonExecutionReport::new(
1284        NdjsonRouteExplain::matches(NdjsonSourceCaps::file(options)),
1285        stats,
1286    ))
1287}
1288
1289pub fn run_ndjson_matches_source<W>(
1290    engine: &JetroEngine,
1291    source: NdjsonSource,
1292    predicate: &str,
1293    limit: usize,
1294    writer: W,
1295) -> Result<usize, JetroEngineError>
1296where
1297    W: Write,
1298{
1299    run_ndjson_matches_source_with_options(
1300        engine,
1301        source,
1302        predicate,
1303        limit,
1304        writer,
1305        NdjsonOptions::default(),
1306    )
1307}
1308
1309pub fn run_ndjson_matches_source_with_report<W>(
1310    engine: &JetroEngine,
1311    source: NdjsonSource,
1312    predicate: &str,
1313    limit: usize,
1314    writer: W,
1315) -> Result<NdjsonExecutionReport, JetroEngineError>
1316where
1317    W: Write,
1318{
1319    run_ndjson_matches_source_with_report_and_options(
1320        engine,
1321        source,
1322        predicate,
1323        limit,
1324        writer,
1325        NdjsonOptions::default(),
1326    )
1327}
1328
1329pub fn run_ndjson_matches_source_with_report_and_options<W>(
1330    engine: &JetroEngine,
1331    source: NdjsonSource,
1332    predicate: &str,
1333    limit: usize,
1334    writer: W,
1335    options: NdjsonOptions,
1336) -> Result<NdjsonExecutionReport, JetroEngineError>
1337where
1338    W: Write,
1339{
1340    match source {
1341        NdjsonSource::File(path) => run_ndjson_matches_file_with_report_and_options(
1342            engine, path, predicate, limit, writer, options,
1343        ),
1344        NdjsonSource::Reader(reader) => run_ndjson_matches_with_report_and_options(
1345            engine, reader, predicate, limit, writer, options,
1346        ),
1347    }
1348}
1349
1350pub fn run_ndjson_matches_source_with_options<W>(
1351    engine: &JetroEngine,
1352    source: NdjsonSource,
1353    predicate: &str,
1354    limit: usize,
1355    writer: W,
1356    options: NdjsonOptions,
1357) -> Result<usize, JetroEngineError>
1358where
1359    W: Write,
1360{
1361    match source {
1362        NdjsonSource::File(path) => {
1363            run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
1364        }
1365        NdjsonSource::Reader(reader) => {
1366            run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
1367        }
1368    }
1369}
1370
1371fn drive_ndjson<R, F>(
1372    engine: &JetroEngine,
1373    reader: R,
1374    query: &str,
1375    options: NdjsonOptions,
1376    mut emit: F,
1377) -> Result<usize, JetroEngineError>
1378where
1379    R: BufRead,
1380    F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
1381{
1382    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1383    let plan = engine.cached_plan(query, PlanningContext::bytes());
1384    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1385    let mut count = 0;
1386
1387    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1388        let document = parse_row(engine, line_no, row)?;
1389        let out = collect_row_val(engine, &document, &plan, line_no)?;
1390        count += 1;
1391        if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
1392            break;
1393        }
1394    }
1395
1396    Ok(count)
1397}
1398
1399fn drive_ndjson_rows_file_plan<W>(
1400    engine: &JetroEngine,
1401    path: &Path,
1402    plan: &NdjsonRowsFilePlan,
1403    limit: Option<usize>,
1404    options: NdjsonOptions,
1405    writer: W,
1406) -> Result<usize, JetroEngineError>
1407where
1408    W: Write,
1409{
1410    match plan {
1411        NdjsonRowsFilePlan::Stream(plan) => {
1412            drive_ndjson_rows_stream_file(engine, path, plan, limit, options, writer)
1413        }
1414        NdjsonRowsFilePlan::Fanout(plan) => {
1415            drive_ndjson_rows_fanout_file(engine, path, plan, options, writer)
1416        }
1417        NdjsonRowsFilePlan::Subquery(plan) => {
1418            drive_ndjson_rows_subquery_file(engine, path, plan, options, writer)
1419        }
1420    }
1421}
1422
1423fn drive_ndjson_rows_subquery_file<P, W>(
1424    engine: &JetroEngine,
1425    path: P,
1426    plan: &RowStreamSubqueryPlan,
1427    options: NdjsonOptions,
1428    writer: W,
1429) -> Result<usize, JetroEngineError>
1430where
1431    P: AsRef<Path>,
1432    W: Write,
1433{
1434    let (stream_value, _) =
1435        collect_ndjson_rows_stream_file_with_stats(engine, path, &plan.stream, options)?;
1436    let wrapper = Compiler::compile(&plan.wrapper, "<ndjson-rows-wrapper>");
1437    let env = Env::new(Val::Null).with_var(STREAM_BINDING, stream_value);
1438    let value = engine
1439        .lock_vm()
1440        .exec_in_env(&wrapper, &env)
1441        .map_err(JetroEngineError::Eval)?;
1442    let mut writer = ndjson_writer_with_options(writer, options);
1443    let emitted = write_val_line_with_options(&mut writer, &value, options)? as usize;
1444    writer.flush()?;
1445    Ok(emitted)
1446}
1447
1448fn drive_ndjson_rows_subquery_file_with_stats<P, W>(
1449    engine: &JetroEngine,
1450    path: P,
1451    plan: &RowStreamSubqueryPlan,
1452    options: NdjsonOptions,
1453    writer: W,
1454) -> Result<(usize, RowStreamStats), JetroEngineError>
1455where
1456    P: AsRef<Path>,
1457    W: Write,
1458{
1459    let (stream_value, mut stats) =
1460        collect_ndjson_rows_stream_file_with_stats(engine, path, &plan.stream, options)?;
1461    let wrapper = Compiler::compile(&plan.wrapper, "<ndjson-rows-wrapper>");
1462    let env = Env::new(Val::Null).with_var(STREAM_BINDING, stream_value);
1463    let value = engine
1464        .lock_vm()
1465        .exec_in_env(&wrapper, &env)
1466        .map_err(JetroEngineError::Eval)?;
1467    let mut writer = ndjson_writer_with_options(writer, options);
1468    let emitted = write_val_line_with_options(&mut writer, &value, options)? as usize;
1469    stats.rows_emitted = emitted;
1470    writer.flush()?;
1471    Ok((emitted, stats))
1472}
1473
1474fn collect_ndjson_rows_stream_file_with_stats<P>(
1475    engine: &JetroEngine,
1476    path: P,
1477    plan: &RowStreamPlan,
1478    options: NdjsonOptions,
1479) -> Result<(Val, RowStreamStats), JetroEngineError>
1480where
1481    P: AsRef<Path>,
1482{
1483    if let Some(result) =
1484        super::ndjson_parallel::collect_rows_stream_file_with_stats(engine, path.as_ref(), plan, options)?
1485    {
1486        return Ok((result.value, result.stats));
1487    }
1488
1489    let mut executor = CompiledRowStream::new(plan);
1490    let mut out = Vec::new();
1491
1492    if plan.direction == RowStreamDirection::Forward {
1493        let file = File::open(path)?;
1494        let mut driver = NdjsonPerRowDriver::new(std::io::BufReader::with_capacity(
1495            options.reader_buffer_capacity,
1496            file,
1497        ))
1498        .with_options(options);
1499        let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1500        while !executor.is_exhausted() {
1501            let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1502                break;
1503            };
1504            if collect_row_stream_result(
1505                engine,
1506                line_no,
1507                executor.apply_owned_row(engine, line_no, row)?,
1508                &mut out,
1509            )? || executor.is_exhausted()
1510            {
1511                break;
1512            }
1513        }
1514    } else {
1515        let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1516        while !executor.is_exhausted() {
1517            let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1518                break;
1519            };
1520            if collect_row_stream_result(
1521                engine,
1522                line_no,
1523                executor.apply_owned_row(engine, line_no, row)?,
1524                &mut out,
1525            )? || executor.is_exhausted()
1526            {
1527                break;
1528            }
1529        }
1530    }
1531
1532    if let Some(value) = executor.finish() {
1533        Ok((value, executor.stats().clone()))
1534    } else if plan.demand.retained_limit == Some(1) {
1535        Ok((
1536            out.into_iter().next().unwrap_or(Val::Null),
1537            executor.stats().clone(),
1538        ))
1539    } else {
1540        Ok((Val::Arr(std::sync::Arc::new(out)), executor.stats().clone()))
1541    }
1542}
1543
1544pub(super) fn collect_row_stream_result(
1545    engine: &JetroEngine,
1546    line_no: u64,
1547    result: RowStreamRowResult,
1548    out: &mut Vec<Val>,
1549) -> Result<bool, JetroEngineError> {
1550    match result {
1551        RowStreamRowResult::Emit(value) => out.push(value),
1552        RowStreamRowResult::EmitBytes(bytes) => {
1553            let document = parse_row(engine, line_no, bytes)?;
1554            let value = document
1555                .root_val_with(engine.keys())
1556                .map_err(|err| row_eval_error(line_no, err))?;
1557            out.push(value);
1558        }
1559        RowStreamRowResult::Skip => {}
1560        RowStreamRowResult::Stop => return Ok(true),
1561    }
1562    Ok(false)
1563}
1564
1565fn drive_ndjson_rows_stream_reader<R, W>(
1566    engine: &JetroEngine,
1567    reader: R,
1568    plan: &RowStreamPlan,
1569    external_limit: Option<usize>,
1570    options: NdjsonOptions,
1571    writer: W,
1572) -> Result<usize, JetroEngineError>
1573where
1574    R: BufRead,
1575    W: Write,
1576{
1577    let (emitted, _) = drive_ndjson_rows_stream_reader_with_stats(
1578        engine,
1579        reader,
1580        plan,
1581        external_limit,
1582        options,
1583        writer,
1584    )?;
1585    Ok(emitted)
1586}
1587
1588fn drive_ndjson_rows_stream_reader_with_stats<R, W>(
1589    engine: &JetroEngine,
1590    reader: R,
1591    plan: &RowStreamPlan,
1592    external_limit: Option<usize>,
1593    options: NdjsonOptions,
1594    writer: W,
1595) -> Result<(usize, RowStreamStats), JetroEngineError>
1596where
1597    R: BufRead,
1598    W: Write,
1599{
1600    if plan.direction == RowStreamDirection::Reverse {
1601        return Err(JetroEngineError::Eval(EvalError(
1602            "$.rows().reverse() requires a file-backed NDJSON source".into(),
1603        )));
1604    }
1605
1606    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1607    let mut executor = CompiledRowStream::new(plan);
1608    let mut writer = ndjson_writer_with_options(writer, options);
1609    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1610    let mut emitted = 0usize;
1611
1612    while !executor.is_exhausted() {
1613        let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1614            break;
1615        };
1616        if emit_row_stream_result(
1617            executor.apply_owned_row(engine, line_no, row)?,
1618            &mut writer,
1619            &mut emitted,
1620            external_limit,
1621            options,
1622        )? {
1623            break;
1624        }
1625        if executor.is_exhausted() {
1626            break;
1627        }
1628    }
1629
1630    emit_row_stream_finish(
1631        &executor,
1632        &mut writer,
1633        &mut emitted,
1634        external_limit,
1635        options,
1636    )?;
1637    writer.flush()?;
1638    Ok((emitted, executor.stats().clone()))
1639}
1640
1641fn drive_ndjson_rows_stream_file<P, W>(
1642    engine: &JetroEngine,
1643    path: P,
1644    plan: &RowStreamPlan,
1645    external_limit: Option<usize>,
1646    options: NdjsonOptions,
1647    writer: W,
1648) -> Result<usize, JetroEngineError>
1649where
1650    P: AsRef<Path>,
1651    W: Write,
1652{
1653    if let Some(value) =
1654        super::ndjson_parallel::collect_rows_stream_file(engine, path.as_ref(), plan, options)?
1655    {
1656        return write_collected_rows_stream(value, external_limit, options, writer);
1657    }
1658
1659    let (emitted, _) = drive_ndjson_rows_stream_file_with_stats(
1660        engine,
1661        path,
1662        plan,
1663        external_limit,
1664        options,
1665        writer,
1666    )?;
1667    Ok(emitted)
1668}
1669
1670fn write_collected_rows_stream<W: Write>(
1671    value: Val,
1672    external_limit: Option<usize>,
1673    options: NdjsonOptions,
1674    writer: W,
1675) -> Result<usize, JetroEngineError> {
1676    let mut writer = ndjson_writer_with_options(writer, options);
1677    let mut emitted = 0usize;
1678    match value {
1679        Val::Arr(values) => {
1680            for value in values.iter() {
1681                if external_limit.is_some_and(|limit| emitted >= limit) {
1682                    break;
1683                }
1684                if write_val_line_with_options(&mut writer, value, options)? {
1685                    emitted += 1;
1686                }
1687            }
1688        }
1689        value => {
1690            if write_val_line_with_options(&mut writer, &value, options)? {
1691                emitted += 1;
1692            }
1693        }
1694    }
1695    writer.flush()?;
1696    Ok(emitted)
1697}
1698
1699fn drive_ndjson_rows_stream_file_with_stats<P, W>(
1700    engine: &JetroEngine,
1701    path: P,
1702    plan: &RowStreamPlan,
1703    external_limit: Option<usize>,
1704    options: NdjsonOptions,
1705    writer: W,
1706) -> Result<(usize, RowStreamStats), JetroEngineError>
1707where
1708    P: AsRef<Path>,
1709    W: Write,
1710{
1711    if let Some(result) = super::ndjson_parallel::collect_rows_stream_file_with_stats(
1712        engine,
1713        path.as_ref(),
1714        plan,
1715        options,
1716    )? {
1717        let mut stats = result.stats;
1718        let emitted = write_collected_rows_stream(result.value, external_limit, options, writer)?;
1719        stats.rows_emitted = emitted;
1720        return Ok((emitted, stats));
1721    }
1722
1723    if plan.direction == RowStreamDirection::Forward {
1724        let file = File::open(path)?;
1725        return drive_ndjson_rows_stream_reader_with_stats(
1726            engine,
1727            std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1728            plan,
1729            external_limit,
1730            options,
1731            writer,
1732        );
1733    }
1734
1735    let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1736    let mut executor = CompiledRowStream::new(plan);
1737    let mut writer = ndjson_writer_with_options(writer, options);
1738    let mut emitted = 0usize;
1739
1740    while !executor.is_exhausted() {
1741        let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1742            break;
1743        };
1744        if emit_row_stream_result(
1745            executor.apply_owned_row(engine, line_no, row)?,
1746            &mut writer,
1747            &mut emitted,
1748            external_limit,
1749            options,
1750        )? {
1751            break;
1752        }
1753        if executor.is_exhausted() {
1754            break;
1755        }
1756    }
1757
1758    emit_row_stream_finish(
1759        &executor,
1760        &mut writer,
1761        &mut emitted,
1762        external_limit,
1763        options,
1764    )?;
1765    writer.flush()?;
1766    Ok((emitted, executor.stats().clone()))
1767}
1768
1769fn emit_row_stream_finish<W: Write>(
1770    executor: &CompiledRowStream,
1771    writer: &mut W,
1772    emitted: &mut usize,
1773    external_limit: Option<usize>,
1774    options: NdjsonOptions,
1775) -> Result<(), JetroEngineError> {
1776    if external_limit.is_some_and(|limit| *emitted >= limit) {
1777        return Ok(());
1778    }
1779    if let Some(value) = executor.finish() {
1780        if write_val_line_with_options(writer, &value, options)? {
1781            *emitted += 1;
1782        }
1783    }
1784    Ok(())
1785}
1786
1787fn emit_row_stream_result<W: Write>(
1788    result: RowStreamRowResult,
1789    writer: &mut W,
1790    emitted: &mut usize,
1791    external_limit: Option<usize>,
1792    options: NdjsonOptions,
1793) -> Result<bool, JetroEngineError> {
1794    let wrote = match result {
1795        RowStreamRowResult::Emit(value) => write_val_line_with_options(writer, &value, options)?,
1796        RowStreamRowResult::EmitBytes(bytes) => {
1797            write_json_bytes_line_with_options(writer, &bytes, options)?
1798        }
1799        RowStreamRowResult::Skip => return Ok(false),
1800        RowStreamRowResult::Stop => return Ok(true),
1801    };
1802    if wrote {
1803        *emitted += 1;
1804    }
1805    Ok(external_limit.is_some_and(|limit| *emitted >= limit))
1806}
1807
1808fn drive_ndjson_writer<R, W>(
1809    engine: &JetroEngine,
1810    reader: R,
1811    query: &str,
1812    limit: Option<usize>,
1813    options: NdjsonOptions,
1814    writer: W,
1815) -> Result<usize, JetroEngineError>
1816where
1817    R: BufRead,
1818    W: Write,
1819{
1820    Ok(drive_ndjson_writer_with_stats(engine, reader, query, limit, options, writer)?.0)
1821}
1822
1823fn drive_ndjson_writer_with_stats<R, W>(
1824    engine: &JetroEngine,
1825    reader: R,
1826    query: &str,
1827    limit: Option<usize>,
1828    options: NdjsonOptions,
1829    writer: W,
1830) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
1831where
1832    R: BufRead,
1833    W: Write,
1834{
1835    if let Some((byte_plan, tape_plan)) = direct_writer_plans(engine, query) {
1836        if let Some(byte_plan) = byte_plan {
1837            return drive_ndjson_byte_writer(
1838                engine, reader, &byte_plan, &tape_plan, limit, options, writer,
1839            );
1840        }
1841        if tape_plan_can_write_byte_row(&tape_plan) {
1842            return drive_ndjson_tape_byte_writer(
1843                engine, reader, &tape_plan, limit, options, writer,
1844            );
1845        }
1846        return drive_ndjson_tape_writer(engine, reader, &tape_plan, limit, options, writer);
1847    }
1848
1849    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1850    let mut executor = NdjsonRowExecutor::new(engine, query);
1851    let mut writer = ndjson_writer_with_options(writer, options);
1852    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1853    let mut count = 0usize;
1854    let mut stats = NdjsonExecutionStats::default();
1855
1856    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1857        stats.rows_scanned += 1;
1858        let value = executor.eval_owned_row(line_no, row)?;
1859        stats.fallback_project_rows += 1;
1860        if write_val_line_with_options(&mut writer, &value, options)? {
1861            count += 1;
1862            stats.rows_emitted += 1;
1863        }
1864        if limit.is_some_and(|limit| count >= limit) {
1865            break;
1866        }
1867    }
1868
1869    writer.flush()?;
1870    Ok((count, stats))
1871}
1872fn drive_ndjson_byte_writer<R, W>(
1873    engine: &JetroEngine,
1874    reader: R,
1875    byte_plan: &NdjsonDirectBytePlan,
1876    tape_plan: &NdjsonDirectTapePlan,
1877    limit: Option<usize>,
1878    options: NdjsonOptions,
1879    writer: W,
1880) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
1881where
1882    R: BufRead,
1883    W: Write,
1884{
1885    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1886    let mut writer = ndjson_writer_with_options(writer, options);
1887    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1888    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1889    let mut scratch =
1890        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1891    let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1892    let mut count = 0usize;
1893    let mut stats = NdjsonExecutionStats::default();
1894
1895    visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1896        stats.rows_scanned += 1;
1897        out.clear();
1898        match write_ndjson_byte_plan_row(&mut out, row, byte_plan)? {
1899            BytePlanWrite::Done => {
1900                stats.direct_project_rows += 1;
1901            }
1902            BytePlanWrite::Fallback => {
1903                stats.fallback_project_rows += 1;
1904                scratch.parse_slice(row).map_err(|message| {
1905                    row_parse_error(
1906                        line_no,
1907                        JetroEngineError::Eval(crate::EvalError(format!(
1908                            "Invalid JSON: {message}"
1909                        ))),
1910                    )
1911                })?;
1912                tape_runner.write_row(&scratch, &mut out)?;
1913            }
1914        }
1915        if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1916            count += 1;
1917            stats.rows_emitted += 1;
1918        }
1919        Ok(!limit.is_some_and(|limit| count >= limit))
1920    })?;
1921
1922    writer.flush()?;
1923    Ok((count, stats))
1924}
1925fn drive_ndjson_tape_byte_writer<R, W>(
1926    engine: &JetroEngine,
1927    reader: R,
1928    tape_plan: &NdjsonDirectTapePlan,
1929    limit: Option<usize>,
1930    options: NdjsonOptions,
1931    writer: W,
1932) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
1933where
1934    R: BufRead,
1935    W: Write,
1936{
1937    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1938    let mut writer = ndjson_writer_with_options(writer, options);
1939    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1940    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1941    let mut scratch =
1942        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1943    let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
1944    let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1945    let mut constant_stream_cache = NdjsonConstantStreamCache::default();
1946    let mut hint_state = matches!(
1947        tape_plan,
1948        NdjsonDirectTapePlan::Object(_)
1949            | NdjsonDirectTapePlan::Array(_)
1950            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1951                sink: NdjsonDirectStreamSink::Collect(_),
1952                ..
1953            })
1954            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1955                sink: NdjsonDirectStreamSink::First(_),
1956                ..
1957            })
1958            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1959                sink: NdjsonDirectStreamSink::Last(_),
1960                ..
1961            })
1962            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1963                sink: NdjsonDirectStreamSink::Count,
1964                ..
1965            })
1966            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1967                sink: NdjsonDirectStreamSink::Numeric { .. },
1968                ..
1969            })
1970            | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1971                sink: NdjsonDirectStreamSink::Extreme { .. },
1972                ..
1973            })
1974    )
1975    .then(|| {
1976        NdjsonHintState::new(
1977            NdjsonHintConfig::default(),
1978            NdjsonHintAccessPlan::from_direct_plans(None, tape_plan),
1979        )
1980    });
1981    let mut count = 0usize;
1982    let mut stats = NdjsonExecutionStats::default();
1983
1984    visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1985        stats.rows_scanned += 1;
1986        out.clear();
1987        if let Some(write) = constant_stream_cache.write_row(&mut out, row, tape_plan)? {
1988            if matches!(write, BytePlanWrite::Done) {
1989                stats.direct_project_rows += 1;
1990                if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1991                    count += 1;
1992                    stats.rows_emitted += 1;
1993                }
1994                return Ok(!limit.is_some_and(|limit| count >= limit));
1995            }
1996        }
1997        let hinted = if let Some(state) = hint_state.as_mut() {
1998            if state.observe_row(row) == NdjsonHintDecision::UseHints {
1999                byte_scratch.clear();
2000                let write = state
2001                    .with_root_layout_match(row, |root, matched| {
2002                        write_ndjson_hinted_tape_plan_row(
2003                            &mut byte_scratch,
2004                            tape_plan,
2005                            root,
2006                            matched,
2007                        )
2008                    })
2009                    .transpose()?
2010                    .unwrap_or(BytePlanWrite::Fallback);
2011                if matches!(write, BytePlanWrite::Done) {
2012                    out.extend_from_slice(&byte_scratch);
2013                }
2014                Some(write)
2015            } else {
2016                None
2017            }
2018        } else {
2019            None
2020        };
2021        let write = match hinted {
2022            Some(write) => Ok(write),
2023            None => write_ndjson_byte_tape_plan_row(&mut out, row, tape_plan, &mut byte_scratch),
2024        };
2025        match write? {
2026            BytePlanWrite::Done => {
2027                stats.direct_project_rows += 1;
2028            }
2029            BytePlanWrite::Fallback => {
2030                stats.fallback_project_rows += 1;
2031                scratch.parse_slice(row).map_err(|message| {
2032                    row_parse_error(
2033                        line_no,
2034                        JetroEngineError::Eval(crate::EvalError(format!(
2035                            "Invalid JSON: {message}"
2036                        ))),
2037                    )
2038                })?;
2039                tape_runner.write_row(&scratch, &mut out)?;
2040            }
2041        }
2042        if write_json_bytes_line_with_options(&mut writer, &out, options)? {
2043            count += 1;
2044            stats.rows_emitted += 1;
2045        }
2046        Ok(!limit.is_some_and(|limit| count >= limit))
2047    })?;
2048
2049    if let Some(state) = hint_state.as_ref() {
2050        let hint_stats = state.stats();
2051        stats.hint_learned_rows = hint_stats.learned_rows;
2052        stats.hint_rejected_rows = hint_stats.rejected_rows;
2053        stats.hint_rows = hint_stats.hinted_rows;
2054        stats.hint_layout_misses = hint_stats.layout_misses;
2055        stats.hint_disabled = hint_stats.disabled;
2056    }
2057
2058    writer.flush()?;
2059    Ok((count, stats))
2060}
2061fn visit_ndjson_borrowed_rows<R, F>(
2062    driver: &mut NdjsonPerRowDriver<R>,
2063    spill: &mut Vec<u8>,
2064    mut visit: F,
2065) -> Result<(), JetroEngineError>
2066where
2067    R: BufRead,
2068    F: FnMut(u64, &[u8]) -> Result<bool, JetroEngineError>,
2069{
2070    loop {
2071        spill.clear();
2072        let available = driver.reader.fill_buf()?;
2073        if available.is_empty() {
2074            return Ok(());
2075        }
2076        if let Some(pos) = memchr(b'\n', available) {
2077            driver.line_no += 1;
2078            let line_no = driver.line_no;
2079            let mut row = &available[..pos];
2080            if row.last() == Some(&b'\r') {
2081                row = &row[..row.len() - 1];
2082            }
2083            if line_no == 1 && row.starts_with(&[0xef, 0xbb, 0xbf]) {
2084                row = &row[3..];
2085            }
2086            let (start, end) = non_ws_range(row);
2087            let keep_going = if start == end {
2088                true
2089            } else {
2090                let trimmed = &row[start..end];
2091                if trimmed.len() > driver.max_line_len {
2092                    return Err(RowError::LineTooLarge {
2093                        line_no,
2094                        len: trimmed.len(),
2095                        max: driver.max_line_len,
2096                    }
2097                    .into());
2098                }
2099                match frame_payload(driver.row_frame, line_no, trimmed)? {
2100                    FramePayload::Data(range) => visit(line_no, &trimmed[range])?,
2101                    FramePayload::Skip => true,
2102                }
2103            };
2104            driver.reader.consume(pos + 1);
2105            if !keep_going {
2106                return Ok(());
2107            }
2108        } else {
2109            let read = driver.read_physical_line(spill)?;
2110            if read == 0 {
2111                return Ok(());
2112            }
2113            driver.line_no += 1;
2114            strip_initial_bom(driver.line_no, spill);
2115            trim_line_ending(spill);
2116            let (start, end) = non_ws_range(spill);
2117            if start == end {
2118                continue;
2119            }
2120            let len = end - start;
2121            if len > driver.max_line_len {
2122                return Err(RowError::LineTooLarge {
2123                    line_no: driver.line_no,
2124                    len,
2125                    max: driver.max_line_len,
2126                }
2127                .into());
2128            }
2129            match frame_payload(driver.row_frame, driver.line_no, &spill[start..end])? {
2130                FramePayload::Data(range) => {
2131                    if !visit(
2132                        driver.line_no,
2133                        &spill[start + range.start..start + range.end],
2134                    )? {
2135                        return Ok(());
2136                    }
2137                }
2138                FramePayload::Skip => {}
2139            }
2140        }
2141    }
2142}
2143fn drive_ndjson_tape_writer<R, W>(
2144    engine: &JetroEngine,
2145    reader: R,
2146    plan: &NdjsonDirectTapePlan,
2147    limit: Option<usize>,
2148    options: NdjsonOptions,
2149    writer: W,
2150) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
2151where
2152    R: BufRead,
2153    W: Write,
2154{
2155    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2156    let mut writer = ndjson_writer_with_options(writer, options);
2157    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
2158    let mut scratch =
2159        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
2160    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
2161    let mut count = 0usize;
2162    let mut runner = NdjsonTapeWriterRunner::new(engine, plan);
2163    let mut stats = NdjsonExecutionStats::default();
2164
2165    while let Some((line_no, row)) = driver.read_next_nonempty(&mut line)? {
2166        stats.rows_scanned += 1;
2167        out.clear();
2168        scratch.parse_slice(row).map_err(|message| {
2169            row_parse_error(
2170                line_no,
2171                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
2172            )
2173        })?;
2174        runner.write_row(&scratch, &mut out)?;
2175        stats.fallback_project_rows += 1;
2176        if write_json_bytes_line_with_options(&mut writer, &out, options)? {
2177            count += 1;
2178            stats.rows_emitted += 1;
2179        }
2180        if limit.is_some_and(|limit| count >= limit) {
2181            break;
2182        }
2183    }
2184
2185    writer.flush()?;
2186    Ok((count, stats))
2187}
2188pub(super) struct NdjsonTapeWriterRunner<'a, 'p> {
2189    plan: &'p NdjsonDirectTapePlan,
2190    vm: Option<MutexGuard<'a, VM>>,
2191    env: Option<crate::data::context::Env>,
2192    root_path: NdjsonPathCache,
2193    source_path: NdjsonPathCache,
2194    suffix_path: NdjsonPathCache,
2195    predicate_path: NdjsonPathCache,
2196    object_paths: Vec<NdjsonPathCache>,
2197}
2198impl<'a, 'p> NdjsonTapeWriterRunner<'a, 'p> {
2199    pub(super) fn new(engine: &'a JetroEngine, plan: &'p NdjsonDirectTapePlan) -> Self {
2200        let needs_vm = plan.needs_vm();
2201        Self {
2202            plan,
2203            vm: needs_vm.then(|| engine.lock_vm()),
2204            env: needs_vm.then(|| crate::data::context::Env::new(Val::Null)),
2205            root_path: NdjsonPathCache::default(),
2206            source_path: NdjsonPathCache::default(),
2207            suffix_path: NdjsonPathCache::default(),
2208            predicate_path: NdjsonPathCache::default(),
2209            object_paths: Vec::new(),
2210        }
2211    }
2212
2213    pub(super) fn write_row<W: Write>(
2214        &mut self,
2215        scratch: &crate::data::tape::TapeScratch,
2216        writer: &mut W,
2217    ) -> Result<(), JetroEngineError> {
2218        match self.plan {
2219            NdjsonDirectTapePlan::RootPath(steps) => {
2220                if let Some(idx) = self.root_path.index(scratch, 0, steps) {
2221                    write_json_tape_at(writer, scratch, idx)?;
2222                } else {
2223                    writer.write_all(b"null")?;
2224                }
2225            }
2226            NdjsonDirectTapePlan::ViewScalarCall {
2227                steps,
2228                call,
2229                optional,
2230            } => {
2231                let idx = self.root_path.index(scratch, 0, steps);
2232                let value = idx
2233                    .map(|idx| json_tape_scalar(scratch, idx))
2234                    .unwrap_or(crate::util::JsonView::Null);
2235                if *optional && matches!(value, crate::util::JsonView::Null) {
2236                    writer.write_all(b"null")?;
2237                } else if let Some(value) = call.try_apply_json_view(value) {
2238                    write_val_json(writer, &value)?;
2239                } else if let Some(idx) = idx {
2240                    write_json_tape_at(writer, scratch, idx)?;
2241                } else {
2242                    writer.write_all(b"null")?;
2243                }
2244            }
2245            NdjsonDirectTapePlan::ArrayElementViewScalarCall {
2246                source_steps,
2247                element,
2248                suffix_steps,
2249                call,
2250            } => {
2251                let idx = self
2252                    .source_path
2253                    .index(scratch, 0, source_steps)
2254                    .and_then(|idx| json_tape_array_element(scratch, idx, *element))
2255                    .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
2256                if let Some(value) = idx
2257                    .map(|idx| json_tape_scalar(scratch, idx))
2258                    .and_then(|value| call.try_apply_json_view(value))
2259                {
2260                    write_val_json(writer, &value)?;
2261                } else if let Some(idx) = idx {
2262                    write_json_tape_at(writer, scratch, idx)?;
2263                } else {
2264                    writer.write_all(b"null")?;
2265                }
2266            }
2267            NdjsonDirectTapePlan::ObjectItems { steps, method } => {
2268                let idx = self.root_path.index(scratch, 0, steps);
2269                write_json_tape_object_items(writer, scratch, idx, *method)?;
2270            }
2271            NdjsonDirectTapePlan::ArrayElementPath {
2272                source_steps,
2273                element,
2274                suffix_steps,
2275            } => {
2276                let idx = self
2277                    .source_path
2278                    .index(scratch, 0, source_steps)
2279                    .and_then(|idx| json_tape_array_element(scratch, idx, *element))
2280                    .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
2281                if let Some(idx) = idx {
2282                    write_json_tape_at(writer, scratch, idx)?;
2283                } else {
2284                    writer.write_all(b"null")?;
2285                }
2286            }
2287            NdjsonDirectTapePlan::Stream(plan) => {
2288                write_json_tape_stream(
2289                    writer,
2290                    scratch,
2291                    plan,
2292                    &mut self.source_path,
2293                    &mut self.suffix_path,
2294                    &mut self.predicate_path,
2295                    &mut self.object_paths,
2296                )?;
2297            }
2298            NdjsonDirectTapePlan::Object(fields) => {
2299                write_json_tape_object_projection(writer, scratch, fields, &mut self.object_paths)?;
2300            }
2301            NdjsonDirectTapePlan::Array(items) => {
2302                write_json_tape_array_projection(writer, scratch, items, &mut self.object_paths)?;
2303            }
2304            NdjsonDirectTapePlan::ViewPipeline { source_steps, body } => {
2305                let (Some(vm), Some(env)) = (self.vm.as_deref_mut(), self.env.as_ref()) else {
2306                    return Err(JetroEngineError::Eval(crate::EvalError(
2307                        "NDJSON view pipeline requires VM state".to_string(),
2308                    )));
2309                };
2310                let source = json_tape_path_index(scratch, source_steps)
2311                    .map(|idx| crate::data::view::TapeScratchView::Node { tape: scratch, idx })
2312                    .unwrap_or(crate::data::view::TapeScratchView::Missing);
2313                let Some(result) =
2314                    crate::exec::view::run_with_env_and_vm(source, body, None, &env, vm)
2315                else {
2316                    writer.write_all(b"null")?;
2317                    return Ok(());
2318                };
2319                write_val_json(writer, &result.map_err(JetroEngineError::Eval)?)?;
2320            }
2321        }
2322        Ok(())
2323    }
2324}
2325#[derive(Default)]
2326pub(super) struct NdjsonPathCache {
2327    // Per-depth verified field deltas for stable object layouts. Every use
2328    // checks the cached key before returning the value node, so mixed-schema
2329    // rows safely fall back to a normal object scan.
2330    fields: Vec<Option<NdjsonFieldCache>>,
2331}
2332#[derive(Clone, Copy)]
2333struct NdjsonFieldCache {
2334    key_delta: usize,
2335    value_delta: usize,
2336}
2337struct NdjsonPathCaches<'a> {
2338    source: &'a mut NdjsonPathCache,
2339    suffix: &'a mut NdjsonPathCache,
2340    predicate: &'a mut NdjsonPathCache,
2341}
2342impl NdjsonPathCache {
2343    fn index<T: JsonTape>(
2344        &mut self,
2345        tape: &T,
2346        start: usize,
2347        steps: &[crate::ir::physical::PhysicalPathStep],
2348    ) -> Option<usize> {
2349        if let Some(idx) = self.index_cached(tape, start, steps) {
2350            return Some(idx);
2351        }
2352        self.index_uncached(tape, start, steps)
2353    }
2354
2355    fn index_cached<T: JsonTape>(
2356        &self,
2357        tape: &T,
2358        start: usize,
2359        steps: &[crate::ir::physical::PhysicalPathStep],
2360    ) -> Option<usize> {
2361        use crate::ir::physical::PhysicalPathStep;
2362
2363        let [PhysicalPathStep::Field(key), rest @ ..] = steps else {
2364            return None;
2365        };
2366        if rest
2367            .iter()
2368            .any(|step| matches!(step, PhysicalPathStep::Field(_)))
2369        {
2370            return None;
2371        }
2372        let Some(field) = self
2373            .fields
2374            .first()
2375            .copied()
2376            .flatten()
2377            .filter(|field| field.key_delta > 1)
2378        else {
2379            return None;
2380        };
2381        let idx = json_tape_object_cached_field(tape, start, field, key.as_ref())?;
2382        let mut cur = idx;
2383        for step in rest {
2384            cur = json_tape_step_index(tape, cur, step)?;
2385        }
2386        Some(cur)
2387    }
2388
2389    fn index_uncached<T: JsonTape>(
2390        &mut self,
2391        tape: &T,
2392        start: usize,
2393        steps: &[crate::ir::physical::PhysicalPathStep],
2394    ) -> Option<usize> {
2395        self.index_from_depth(tape, start, steps, 0)
2396    }
2397
2398    fn index_from_depth<T: JsonTape>(
2399        &mut self,
2400        tape: &T,
2401        start: usize,
2402        steps: &[crate::ir::physical::PhysicalPathStep],
2403        depth: usize,
2404    ) -> Option<usize> {
2405        use crate::ir::physical::PhysicalPathStep;
2406
2407        match steps {
2408            [] => Some(start),
2409            [PhysicalPathStep::Field(key), rest @ ..] => {
2410                if self.fields.len() <= depth {
2411                    self.fields.resize(depth + 1, None);
2412                }
2413
2414                if let Some(field) = self.fields[depth].filter(|field| field.key_delta > 1) {
2415                    if let Some(idx) =
2416                        json_tape_object_cached_field(tape, start, field, key.as_ref())
2417                    {
2418                        return self.index_from_depth(tape, idx, rest, depth + 1);
2419                    }
2420                }
2421
2422                let (idx, field) =
2423                    json_tape_object_field_index_and_cache(tape, start, key.as_ref())?;
2424                self.fields[depth] = Some(field);
2425                self.index_from_depth(tape, idx, rest, depth + 1)
2426            }
2427            [step, rest @ ..] => {
2428                let idx = json_tape_step_index(tape, start, step)?;
2429                self.index_from_depth(tape, idx, rest, depth + 1)
2430            }
2431        }
2432    }
2433}
2434fn drive_ndjson_tape_matches_writer_with_stats<R, W>(
2435    engine: &JetroEngine,
2436    reader: R,
2437    predicate: &NdjsonDirectPredicate,
2438    limit: usize,
2439    options: NdjsonOptions,
2440    writer: W,
2441) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
2442where
2443    R: BufRead,
2444    W: Write,
2445{
2446    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2447    let mut writer = ndjson_writer_with_options(writer, options);
2448    let mut line = Vec::with_capacity(options.initial_buffer_capacity);
2449    let mut scratch =
2450        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
2451    let mut emitted = 0usize;
2452    let needs_vm = predicate_needs_vm(predicate);
2453    let mut vm = needs_vm.then(|| engine.lock_vm());
2454    let env = needs_vm.then(|| crate::data::context::Env::new(Val::Null));
2455    let mut predicate_path = NdjsonPathCache::default();
2456    let mut stats = NdjsonExecutionStats::default();
2457
2458    while let Some((line_no, row)) = driver.read_next_owned(&mut line)? {
2459        stats.rows_scanned += 1;
2460        if let Some(matched) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2461            stats.direct_filter_rows += 1;
2462            if !matched {
2463                stats.rows_filtered += 1;
2464                continue;
2465            }
2466            writer.write_all(&row)?;
2467            writer.write_all(b"\n")?;
2468            emitted += 1;
2469            stats.rows_emitted += 1;
2470            if emitted >= limit {
2471                break;
2472            }
2473            continue;
2474        }
2475
2476        scratch.parse_slice(&row).map_err(|message| {
2477            row_parse_error(
2478                line_no,
2479                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
2480            )
2481        })?;
2482        if !eval_tape_predicate(
2483            &scratch,
2484            predicate,
2485            env.as_ref(),
2486            &mut vm,
2487            &mut predicate_path,
2488        )
2489        .map_err(JetroEngineError::Eval)?
2490        {
2491            stats.fallback_filter_rows += 1;
2492            stats.rows_filtered += 1;
2493            continue;
2494        }
2495        stats.fallback_filter_rows += 1;
2496        writer.write_all(&row)?;
2497        writer.write_all(b"\n")?;
2498        emitted += 1;
2499        stats.rows_emitted += 1;
2500        if emitted >= limit {
2501            break;
2502        }
2503    }
2504
2505    writer.flush()?;
2506    Ok((emitted, stats))
2507}
2508
2509fn drive_ndjson_matches<R, F>(
2510    engine: &JetroEngine,
2511    reader: R,
2512    predicate: &str,
2513    limit: usize,
2514    options: NdjsonOptions,
2515    mut emit: F,
2516) -> Result<usize, JetroEngineError>
2517where
2518    R: BufRead,
2519    F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
2520{
2521    if limit == 0 {
2522        return Ok(0);
2523    }
2524
2525    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2526    let direct_predicate = direct_tape_predicate(engine, predicate);
2527    let mut executor = NdjsonRowExecutor::new(engine, predicate);
2528    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2529    let mut emitted = 0usize;
2530
2531    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2532        if let Some(predicate) = direct_predicate.as_ref() {
2533            if let Some(false) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2534                continue;
2535            }
2536        }
2537
2538        let document = executor.parse_owned_row(line_no, row)?;
2539        let matched = executor.eval_document(line_no, &document)?;
2540        if !is_truthy(&matched) {
2541            continue;
2542        }
2543
2544        let root = document
2545            .root_val_with(engine.keys())
2546            .map_err(|err| row_eval_error(line_no, err))?;
2547        emitted += 1;
2548        if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
2549            break;
2550        }
2551    }
2552
2553    Ok(emitted)
2554}
2555
2556fn drive_ndjson_matches_writer<R, W>(
2557    engine: &JetroEngine,
2558    reader: R,
2559    predicate: &str,
2560    limit: usize,
2561    options: NdjsonOptions,
2562    writer: W,
2563) -> Result<usize, JetroEngineError>
2564where
2565    R: BufRead,
2566    W: Write,
2567{
2568    Ok(drive_ndjson_matches_writer_with_stats(
2569        engine, reader, predicate, limit, options, writer,
2570    )?
2571    .0)
2572}
2573
2574fn drive_ndjson_matches_writer_with_stats<R, W>(
2575    engine: &JetroEngine,
2576    reader: R,
2577    predicate: &str,
2578    limit: usize,
2579    options: NdjsonOptions,
2580    writer: W,
2581) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
2582where
2583    R: BufRead,
2584    W: Write,
2585{
2586    if limit == 0 {
2587        return Ok((0, NdjsonExecutionStats::default()));
2588    }
2589    if let Some(predicate) = direct_tape_predicate(engine, predicate) {
2590        return drive_ndjson_tape_matches_writer_with_stats(
2591            engine, reader, &predicate, limit, options, writer,
2592        );
2593    }
2594
2595    let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2596    let mut executor = NdjsonRowExecutor::new(engine, predicate);
2597    let mut writer = ndjson_writer_with_options(writer, options);
2598    let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2599    let mut emitted = 0usize;
2600    let mut stats = NdjsonExecutionStats::default();
2601
2602    while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2603        stats.rows_scanned += 1;
2604        let document = executor.parse_owned_row(line_no, row)?;
2605        let matched = executor.eval_document(line_no, &document)?;
2606        stats.fallback_filter_rows += 1;
2607        if !is_truthy(&matched) {
2608            stats.rows_filtered += 1;
2609            continue;
2610        }
2611
2612        write_document_line(&mut writer, &document, line_no, executor.engine())?;
2613        emitted += 1;
2614        stats.rows_emitted += 1;
2615        if emitted >= limit {
2616            break;
2617        }
2618    }
2619
2620    writer.flush()?;
2621    Ok((emitted, stats))
2622}
2623
2624pub(super) struct NdjsonRowExecutor<'a> {
2625    engine: &'a JetroEngine,
2626    plan: crate::ir::physical::QueryPlan,
2627    vm: MutexGuard<'a, VM>,
2628}
2629
2630impl<'a> NdjsonRowExecutor<'a> {
2631    pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
2632        Self {
2633            engine,
2634            plan: engine.cached_plan(query, PlanningContext::bytes()),
2635            vm: engine.lock_vm(),
2636        }
2637    }
2638
2639    pub(super) fn eval_owned_row(
2640        &mut self,
2641        line_no: u64,
2642        row: Vec<u8>,
2643    ) -> Result<Val, JetroEngineError> {
2644        let document = self.parse_owned_row(line_no, row)?;
2645        self.eval_document(line_no, &document)
2646    }
2647
2648    pub(super) fn parse_owned_row(
2649        &self,
2650        line_no: u64,
2651        row: Vec<u8>,
2652    ) -> Result<Jetro, JetroEngineError> {
2653        parse_row(self.engine, line_no, row)
2654    }
2655
2656    pub(super) fn eval_document(
2657        &mut self,
2658        line_no: u64,
2659        document: &Jetro,
2660    ) -> Result<Val, JetroEngineError> {
2661        crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
2662            .map_err(|err| row_eval_error(line_no, err))
2663    }
2664
2665    pub(super) fn engine(&self) -> &'a JetroEngine {
2666        self.engine
2667    }
2668}
2669trait JsonTape {
2670    fn nodes(&self) -> &[crate::data::tape::TapeNode];
2671    fn str_at(&self, idx: usize) -> &str;
2672    fn span(&self, idx: usize) -> usize;
2673}
2674impl JsonTape for crate::data::tape::TapeData {
2675    #[inline]
2676    fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2677        &self.nodes
2678    }
2679
2680    #[inline]
2681    fn str_at(&self, idx: usize) -> &str {
2682        self.str_at(idx)
2683    }
2684
2685    #[inline]
2686    fn span(&self, idx: usize) -> usize {
2687        self.span(idx)
2688    }
2689}
2690impl JsonTape for crate::data::tape::TapeScratch {
2691    #[inline]
2692    fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2693        &self.nodes
2694    }
2695
2696    #[inline]
2697    fn str_at(&self, idx: usize) -> &str {
2698        self.str_at(idx)
2699    }
2700
2701    #[inline]
2702    fn span(&self, idx: usize) -> usize {
2703        self.span(idx)
2704    }
2705}
2706fn json_tape_path_index<T: JsonTape>(
2707    tape: &T,
2708    steps: &[crate::ir::physical::PhysicalPathStep],
2709) -> Option<usize> {
2710    json_tape_path_index_from(tape, 0, steps)
2711}
2712fn json_tape_path_index_from<T: JsonTape>(
2713    tape: &T,
2714    start: usize,
2715    steps: &[crate::ir::physical::PhysicalPathStep],
2716) -> Option<usize> {
2717    if tape.nodes().is_empty() {
2718        return None;
2719    }
2720
2721    return match steps {
2722        [] => Some(start),
2723        [step] => json_tape_step_index(tape, start, step),
2724        [first, second] => json_tape_step_index(tape, start, first)
2725            .and_then(|idx| json_tape_step_index(tape, idx, second)),
2726        _ => json_tape_path_index_slow(tape, start, steps),
2727    };
2728}
2729fn json_tape_path_index_slow<T: JsonTape>(
2730    tape: &T,
2731    start: usize,
2732    steps: &[crate::ir::physical::PhysicalPathStep],
2733) -> Option<usize> {
2734    let mut idx = start;
2735    for step in steps {
2736        idx = json_tape_step_index(tape, idx, step)?;
2737    }
2738    Some(idx)
2739}
2740fn json_tape_step_index<T: JsonTape>(
2741    tape: &T,
2742    start: usize,
2743    step: &crate::ir::physical::PhysicalPathStep,
2744) -> Option<usize> {
2745    use crate::data::tape::TapeNode;
2746    use crate::ir::physical::PhysicalPathStep;
2747
2748    match step {
2749        PhysicalPathStep::Field(key) => {
2750            let TapeNode::Object { len, .. } = tape.nodes()[start] else {
2751                return None;
2752            };
2753            let mut cur = start + 1;
2754            for _ in 0..len {
2755                if tape.str_at(cur) == key.as_ref() {
2756                    return Some(cur + 1);
2757                }
2758                cur += 1;
2759                cur += tape.span(cur);
2760            }
2761            None
2762        }
2763        PhysicalPathStep::Index(wanted) => {
2764            let TapeNode::Array { len, .. } = tape.nodes()[start] else {
2765                return None;
2766            };
2767            let wanted = if *wanted < 0 {
2768                len.checked_sub(wanted.unsigned_abs() as usize)?
2769            } else {
2770                *wanted as usize
2771            };
2772            if wanted >= len {
2773                return None;
2774            }
2775            let mut cur = start + 1;
2776            for _ in 0..wanted {
2777                cur += tape.span(cur);
2778            }
2779            Some(cur)
2780        }
2781    }
2782}
2783fn json_tape_object_cached_field<T: JsonTape>(
2784    tape: &T,
2785    obj_idx: usize,
2786    cache: NdjsonFieldCache,
2787    key: &str,
2788) -> Option<usize> {
2789    let crate::data::tape::TapeNode::Object { .. } = tape.nodes().get(obj_idx).copied()? else {
2790        return None;
2791    };
2792    let key_idx = obj_idx.checked_add(cache.key_delta)?;
2793    let value_idx = obj_idx.checked_add(cache.value_delta)?;
2794    if value_idx >= tape.nodes().len() {
2795        return None;
2796    }
2797    if !matches!(
2798        tape.nodes().get(key_idx),
2799        Some(crate::data::tape::TapeNode::String(_))
2800    ) {
2801        return None;
2802    }
2803    (tape.str_at(key_idx) == key).then_some(value_idx)
2804}
2805fn json_tape_object_field_index_and_cache<T: JsonTape>(
2806    tape: &T,
2807    obj_idx: usize,
2808    key: &str,
2809) -> Option<(usize, NdjsonFieldCache)> {
2810    let crate::data::tape::TapeNode::Object { len, .. } = tape.nodes()[obj_idx] else {
2811        return None;
2812    };
2813    let mut cur = obj_idx + 1;
2814    for _ in 0..len {
2815        if tape.str_at(cur) == key {
2816            return Some((
2817                cur + 1,
2818                NdjsonFieldCache {
2819                    key_delta: cur - obj_idx,
2820                    value_delta: cur + 1 - obj_idx,
2821                },
2822            ));
2823        }
2824        cur += 1;
2825        cur += tape.span(cur);
2826    }
2827    None
2828}
2829fn json_tape_array_element<T: JsonTape>(
2830    tape: &T,
2831    idx: usize,
2832    element: NdjsonDirectElement,
2833) -> Option<usize> {
2834    let crate::data::tape::TapeNode::Array { len, .. } = tape.nodes().get(idx).copied()? else {
2835        return None;
2836    };
2837    let wanted = match element {
2838        NdjsonDirectElement::First => 0,
2839        NdjsonDirectElement::Last => len.checked_sub(1)?,
2840        NdjsonDirectElement::Nth(n) => n,
2841    };
2842    if wanted >= len {
2843        return None;
2844    }
2845    let mut cur = idx + 1;
2846    for _ in 0..wanted {
2847        cur += tape.span(cur);
2848    }
2849    Some(cur)
2850}
2851pub(super) fn eval_tape_predicate(
2852    tape: &crate::data::tape::TapeScratch,
2853    predicate: &NdjsonDirectPredicate,
2854    env: Option<&crate::data::context::Env>,
2855    vm: &mut Option<std::sync::MutexGuard<'_, crate::vm::exec::VM>>,
2856    cache: &mut NdjsonPathCache,
2857) -> Result<bool, crate::EvalError> {
2858    use crate::parse::ast::BinOp;
2859
2860    Ok(match predicate {
2861        NdjsonDirectPredicate::Path(steps) => cache
2862            .index(tape, 0, steps)
2863            .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
2864            .unwrap_or(false),
2865        NdjsonDirectPredicate::Literal(value) => crate::util::is_truthy(value),
2866        NdjsonDirectPredicate::Not(inner) => !eval_tape_predicate(tape, inner, env, vm, cache)?,
2867        NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
2868            eval_tape_predicate(tape, lhs, env, vm, cache)?
2869                && eval_tape_predicate(tape, rhs, env, vm, cache)?
2870        }
2871        NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
2872            eval_tape_predicate(tape, lhs, env, vm, cache)?
2873                || eval_tape_predicate(tape, rhs, env, vm, cache)?
2874        }
2875        NdjsonDirectPredicate::Binary { lhs, op, rhs } => {
2876            let Some(lhs) = eval_tape_scalar(tape, lhs, cache) else {
2877                return Ok(false);
2878            };
2879            let Some(rhs) = eval_tape_scalar(tape, rhs, cache) else {
2880                return Ok(false);
2881            };
2882            crate::util::json_cmp_binop(lhs, *op, rhs)
2883        }
2884        NdjsonDirectPredicate::ViewScalarCall { steps, call } => cache
2885            .index(tape, 0, steps)
2886            .map(|idx| json_tape_scalar(tape, idx))
2887            .and_then(|value| call.try_apply_json_view(value))
2888            .is_some_and(|value| crate::util::is_truthy(&value)),
2889        NdjsonDirectPredicate::ArrayElementViewScalarCall {
2890            source_steps,
2891            element,
2892            suffix_steps,
2893            call,
2894        } => json_tape_path_index(tape, source_steps)
2895            .and_then(|idx| json_tape_array_element(tape, idx, *element))
2896            .and_then(|idx| json_tape_path_index_from(tape, idx, suffix_steps))
2897            .map(|idx| json_tape_scalar(tape, idx))
2898            .and_then(|value| call.try_apply_json_view(value))
2899            .is_some_and(|value| crate::util::is_truthy(&value)),
2900        NdjsonDirectPredicate::ArrayAny { .. } => {
2901            return Err(crate::EvalError(
2902                "array-any predicate requires VM state".to_string(),
2903            ));
2904        }
2905        NdjsonDirectPredicate::ViewPipeline { source_steps, body } => {
2906            let (Some(vm), Some(env)) = (vm.as_deref_mut(), env) else {
2907                return Err(crate::EvalError(
2908                    "view pipeline predicate requires VM state".to_string(),
2909                ));
2910            };
2911            let source = json_tape_path_index(tape, source_steps)
2912                .map(|idx| crate::data::view::TapeScratchView::Node { tape, idx })
2913                .unwrap_or(crate::data::view::TapeScratchView::Missing);
2914            crate::exec::view::run_with_env_and_vm(source, body, None, env, vm)
2915                .transpose()?
2916                .is_some_and(|value| crate::util::is_truthy(&value))
2917        }
2918    })
2919}
2920pub(super) fn predicate_needs_vm(predicate: &NdjsonDirectPredicate) -> bool {
2921    match predicate {
2922        NdjsonDirectPredicate::Not(inner) => predicate_needs_vm(inner),
2923        NdjsonDirectPredicate::Binary { lhs, rhs, .. } => {
2924            predicate_needs_vm(lhs) || predicate_needs_vm(rhs)
2925        }
2926        NdjsonDirectPredicate::ArrayAny { .. } | NdjsonDirectPredicate::ViewPipeline { .. } => true,
2927        NdjsonDirectPredicate::Path(_)
2928        | NdjsonDirectPredicate::Literal(_)
2929        | NdjsonDirectPredicate::ViewScalarCall { .. }
2930        | NdjsonDirectPredicate::ArrayElementViewScalarCall { .. } => false,
2931    }
2932}
2933fn eval_tape_scalar<'a>(
2934    tape: &'a crate::data::tape::TapeScratch,
2935    predicate: &'a NdjsonDirectPredicate,
2936    cache: &mut NdjsonPathCache,
2937) -> Option<crate::util::JsonView<'a>> {
2938    match predicate {
2939        NdjsonDirectPredicate::Path(steps) => cache
2940            .index(tape, 0, steps)
2941            .map(|idx| json_tape_scalar(tape, idx)),
2942        NdjsonDirectPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2943        _ => None,
2944    }
2945}
2946fn json_view_truthy(value: crate::util::JsonView<'_>) -> bool {
2947    match value {
2948        crate::util::JsonView::Null => false,
2949        crate::util::JsonView::Bool(value) => value,
2950        crate::util::JsonView::Int(value) => value != 0,
2951        crate::util::JsonView::UInt(value) => value != 0,
2952        crate::util::JsonView::Float(value) => value != 0.0,
2953        crate::util::JsonView::Str(value) => !value.is_empty(),
2954        crate::util::JsonView::ArrayLen(len) | crate::util::JsonView::ObjectLen(len) => len > 0,
2955    }
2956}
2957fn json_tape_scalar<T: JsonTape>(tape: &T, idx: usize) -> crate::util::JsonView<'_> {
2958    use crate::data::tape::TapeNode;
2959    use simd_json::StaticNode as SN;
2960
2961    let Some(node) = tape.nodes().get(idx).copied() else {
2962        return crate::util::JsonView::Null;
2963    };
2964    match node {
2965        TapeNode::Static(SN::Null) => crate::util::JsonView::Null,
2966        TapeNode::Static(SN::Bool(value)) => crate::util::JsonView::Bool(value),
2967        TapeNode::Static(SN::I64(value)) => crate::util::JsonView::Int(value),
2968        TapeNode::Static(SN::U64(value)) => crate::util::JsonView::UInt(value),
2969        TapeNode::Static(SN::F64(value)) => crate::util::JsonView::Float(value),
2970        TapeNode::String(_) => crate::util::JsonView::Str(tape.str_at(idx)),
2971        TapeNode::Array { len, .. } => crate::util::JsonView::ArrayLen(len),
2972        TapeNode::Object { len, .. } => crate::util::JsonView::ObjectLen(len),
2973    }
2974}
2975
2976pub(super) fn write_document_line<W: Write>(
2977    writer: &mut W,
2978    document: &Jetro,
2979    line_no: u64,
2980    engine: &JetroEngine,
2981) -> Result<(), JetroEngineError> {
2982    if let Some(bytes) = document.raw_bytes() {
2983        writer.write_all(bytes)?;
2984        writer.write_all(b"\n")?;
2985        return Ok(());
2986    }
2987
2988    let root = document
2989        .root_val_with(engine.keys())
2990        .map_err(|err| row_eval_error(line_no, err))?;
2991    write_val_line(writer, &root)
2992}
2993
2994pub(super) fn write_val_json<W: Write>(
2995    writer: &mut W,
2996    value: &Val,
2997) -> Result<(), JetroEngineError> {
2998    match value {
2999        Val::Null => writer.write_all(b"null")?,
3000        Val::Bool(true) => writer.write_all(b"true")?,
3001        Val::Bool(false) => writer.write_all(b"false")?,
3002        Val::Int(n) => write_i64(writer, *n)?,
3003        Val::Float(n) => write_f64(writer, *n)?,
3004        Val::Str(s) => write_json_str(writer, s.as_ref())?,
3005        Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
3006        Val::Arr(items) => write_json_array(writer, items.iter())?,
3007        Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
3008        Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
3009        Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
3010        Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
3011        Val::Obj(entries) => write_json_object(
3012            writer,
3013            entries.iter().map(|(key, value)| (key.as_ref(), value)),
3014        )?,
3015        Val::ObjSmall(entries) => write_json_object(
3016            writer,
3017            entries.iter().map(|(key, value)| (key.as_ref(), value)),
3018        )?,
3019        Val::ObjVec(data) => write_json_objvec(writer, data)?,
3020    }
3021    Ok(())
3022}
3023fn write_json_tape_at<W: Write, T: JsonTape>(
3024    writer: &mut W,
3025    tape: &T,
3026    idx: usize,
3027) -> Result<usize, JetroEngineError> {
3028    use crate::data::tape::TapeNode;
3029    use simd_json::StaticNode as SN;
3030
3031    let Some(node) = tape.nodes().get(idx).copied() else {
3032        writer.write_all(b"null")?;
3033        return Ok(idx);
3034    };
3035
3036    match node {
3037        TapeNode::Static(SN::Null) => {
3038            writer.write_all(b"null")?;
3039            Ok(idx + 1)
3040        }
3041        TapeNode::Static(SN::Bool(true)) => {
3042            writer.write_all(b"true")?;
3043            Ok(idx + 1)
3044        }
3045        TapeNode::Static(SN::Bool(false)) => {
3046            writer.write_all(b"false")?;
3047            Ok(idx + 1)
3048        }
3049        TapeNode::Static(SN::I64(value)) => {
3050            write_i64(writer, value)?;
3051            Ok(idx + 1)
3052        }
3053        TapeNode::Static(SN::U64(value)) => {
3054            write_u64(writer, value)?;
3055            Ok(idx + 1)
3056        }
3057        TapeNode::Static(SN::F64(value)) => {
3058            write_f64(writer, value)?;
3059            Ok(idx + 1)
3060        }
3061        TapeNode::String(_) => {
3062            write_json_str(writer, tape.str_at(idx))?;
3063            Ok(idx + 1)
3064        }
3065        TapeNode::Array { len, .. } => {
3066            writer.write_all(b"[")?;
3067            let mut cur = idx + 1;
3068            for item_idx in 0..len {
3069                if item_idx > 0 {
3070                    writer.write_all(b",")?;
3071                }
3072                cur = write_json_tape_at(writer, tape, cur)?;
3073            }
3074            writer.write_all(b"]")?;
3075            Ok(cur)
3076        }
3077        TapeNode::Object { len, .. } => {
3078            writer.write_all(b"{")?;
3079            let mut cur = idx + 1;
3080            for field_idx in 0..len {
3081                if field_idx > 0 {
3082                    writer.write_all(b",")?;
3083                }
3084                write_json_str(writer, tape.str_at(cur))?;
3085                writer.write_all(b":")?;
3086                cur = write_json_tape_at(writer, tape, cur + 1)?;
3087            }
3088            writer.write_all(b"}")?;
3089            Ok(cur)
3090        }
3091    }
3092}
3093fn visit_json_tape_source_items<T, E, F>(tape: &T, source_idx: usize, mut visit: F) -> Result<(), E>
3094where
3095    T: JsonTape,
3096    F: FnMut(usize) -> Result<(), E>,
3097{
3098    use crate::data::tape::TapeNode;
3099
3100    match tape.nodes().get(source_idx).copied() {
3101        Some(TapeNode::Array { len, .. }) => {
3102            let mut cur = source_idx + 1;
3103            for _ in 0..len {
3104                visit(cur)?;
3105                cur += tape.span(cur);
3106            }
3107        }
3108        Some(_) => visit(source_idx)?,
3109        None => {}
3110    }
3111    Ok(())
3112}
3113fn find_json_tape_source_item<T, F>(tape: &T, source_idx: usize, mut matches: F) -> Option<usize>
3114where
3115    T: JsonTape,
3116    F: FnMut(usize) -> bool,
3117{
3118    use crate::data::tape::TapeNode;
3119
3120    match tape.nodes().get(source_idx).copied()? {
3121        TapeNode::Array { len, .. } => {
3122            let mut cur = source_idx + 1;
3123            for _ in 0..len {
3124                if matches(cur) {
3125                    return Some(cur);
3126                }
3127                cur += tape.span(cur);
3128            }
3129            None
3130        }
3131        _ => matches(source_idx).then_some(source_idx),
3132    }
3133}
3134fn write_json_tape_stream<W: Write, T: JsonTape>(
3135    writer: &mut W,
3136    tape: &T,
3137    plan: &NdjsonDirectStreamPlan,
3138    source_cache: &mut NdjsonPathCache,
3139    suffix_cache: &mut NdjsonPathCache,
3140    predicate_cache: &mut NdjsonPathCache,
3141    projection_caches: &mut Vec<NdjsonPathCache>,
3142) -> Result<(), JetroEngineError> {
3143    let Some(source_idx) = source_cache.index(tape, 0, &plan.source_steps) else {
3144        write_json_tape_empty_stream_result(writer, &plan.sink)?;
3145        return Ok(());
3146    };
3147
3148    match &plan.sink {
3149        NdjsonDirectStreamSink::Collect(map) => {
3150            writer.write_all(b"[")?;
3151            let mut wrote_row = false;
3152            visit_json_tape_source_items(tape, source_idx, |item_idx| {
3153                if !plan.predicate.as_ref().is_none_or(|predicate| {
3154                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3155                }) {
3156                    return Ok::<(), JetroEngineError>(());
3157                }
3158                if wrote_row {
3159                    writer.write_all(b",")?;
3160                }
3161                write_json_tape_stream_map(
3162                    writer,
3163                    tape,
3164                    item_idx,
3165                    map,
3166                    suffix_cache,
3167                    projection_caches,
3168                )?;
3169                wrote_row = true;
3170                Ok(())
3171            })?;
3172            writer.write_all(b"]")?;
3173        }
3174        NdjsonDirectStreamSink::Count => {
3175            let mut count = 0usize;
3176            let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
3177                if plan.predicate.as_ref().is_none_or(|predicate| {
3178                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3179                }) {
3180                    count += 1;
3181                }
3182                Ok(())
3183            });
3184            write_i64(writer, count as i64)?;
3185        }
3186        NdjsonDirectStreamSink::First(map) => {
3187            let selected = find_json_tape_source_item(tape, source_idx, |item_idx| {
3188                plan.predicate.as_ref().is_none_or(|predicate| {
3189                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3190                })
3191            });
3192            if let Some(item_idx) = selected {
3193                write_json_tape_stream_map(
3194                    writer,
3195                    tape,
3196                    item_idx,
3197                    map,
3198                    suffix_cache,
3199                    projection_caches,
3200                )?;
3201            } else {
3202                writer.write_all(b"null")?;
3203            }
3204        }
3205        NdjsonDirectStreamSink::Last(map) => {
3206            let mut selected = None;
3207            visit_json_tape_source_items(tape, source_idx, |item_idx| {
3208                if plan.predicate.as_ref().is_none_or(|predicate| {
3209                    eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3210                }) {
3211                    selected = Some(item_idx);
3212                }
3213                Ok::<(), JetroEngineError>(())
3214            })?;
3215            if let Some(item_idx) = selected {
3216                write_json_tape_stream_map(
3217                    writer,
3218                    tape,
3219                    item_idx,
3220                    map,
3221                    suffix_cache,
3222                    projection_caches,
3223                )?;
3224            } else {
3225                writer.write_all(b"null")?;
3226            }
3227        }
3228        NdjsonDirectStreamSink::Numeric { suffix_steps, op } => {
3229            let caches = NdjsonPathCaches {
3230                source: source_cache,
3231                suffix: suffix_cache,
3232                predicate: predicate_cache,
3233            };
3234            let value = reduce_json_tape_numeric_path(
3235                tape,
3236                &plan.source_steps,
3237                plan.predicate.as_ref(),
3238                suffix_steps,
3239                *op,
3240                caches,
3241            );
3242            write_val_json(writer, &value)?;
3243        }
3244        NdjsonDirectStreamSink::Extreme {
3245            key_steps,
3246            want_max,
3247            value,
3248        } => {
3249            let mut best_idx = None;
3250            visit_json_tape_source_items(tape, source_idx, |item_idx| {
3251                let Some(key_idx) = suffix_cache.index(tape, item_idx, key_steps) else {
3252                    return Ok::<(), JetroEngineError>(());
3253                };
3254                let key = json_tape_scalar(tape, key_idx);
3255                let replace = best_idx
3256                    .and_then(|idx| suffix_cache.index(tape, idx, key_steps))
3257                    .map(|idx| {
3258                        let order = crate::util::json_cmp_vals(key, json_tape_scalar(tape, idx));
3259                        (*want_max && order.is_gt()) || (!*want_max && order.is_lt())
3260                    })
3261                    .unwrap_or(true);
3262                if replace {
3263                    best_idx = Some(item_idx);
3264                }
3265                Ok(())
3266            })?;
3267            if let Some(item_idx) = best_idx {
3268                let path_idx = match value {
3269                    NdjsonDirectProjectionValue::Path(steps)
3270                    | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3271                        suffix_cache.index(tape, item_idx, steps)
3272                    }
3273                    NdjsonDirectProjectionValue::Nested(_)
3274                    | NdjsonDirectProjectionValue::Literal(_) => None,
3275                };
3276                write_json_tape_direct_value(writer, tape, value, path_idx)?;
3277            } else {
3278                writer.write_all(b"null")?;
3279            }
3280        }
3281    }
3282
3283    Ok(())
3284}
3285fn write_json_tape_empty_stream_result<W: Write>(
3286    writer: &mut W,
3287    sink: &NdjsonDirectStreamSink,
3288) -> Result<(), JetroEngineError> {
3289    match sink {
3290        NdjsonDirectStreamSink::Collect(_) => writer.write_all(b"[]")?,
3291        NdjsonDirectStreamSink::First(_) | NdjsonDirectStreamSink::Last(_) => {
3292            writer.write_all(b"null")?
3293        }
3294        NdjsonDirectStreamSink::Count => writer.write_all(b"0")?,
3295        NdjsonDirectStreamSink::Numeric { op, .. } => {
3296            let value = crate::exec::pipeline::num_finalise(
3297                *op,
3298                0,
3299                0.0,
3300                false,
3301                f64::INFINITY,
3302                f64::NEG_INFINITY,
3303                0,
3304            );
3305            write_val_json(writer, &value)?;
3306        }
3307        NdjsonDirectStreamSink::Extreme { .. } => writer.write_all(b"null")?,
3308    }
3309    Ok(())
3310}
3311fn write_json_tape_stream_map<W: Write, T: JsonTape>(
3312    writer: &mut W,
3313    tape: &T,
3314    item_idx: usize,
3315    map: &NdjsonDirectStreamMap,
3316    suffix_cache: &mut NdjsonPathCache,
3317    projection_caches: &mut Vec<NdjsonPathCache>,
3318) -> Result<(), JetroEngineError> {
3319    match map {
3320        NdjsonDirectStreamMap::Value(value) => {
3321            let path_idx = match value {
3322                NdjsonDirectProjectionValue::Path(steps)
3323                | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3324                    suffix_cache.index(tape, item_idx, steps)
3325                }
3326                NdjsonDirectProjectionValue::Nested(_) => None,
3327                NdjsonDirectProjectionValue::Literal(_) => None,
3328            };
3329            write_json_tape_direct_value(writer, tape, value, path_idx)?;
3330        }
3331        NdjsonDirectStreamMap::Array(items) => {
3332            write_json_tape_array_projection_from(
3333                writer,
3334                tape,
3335                item_idx,
3336                items,
3337                projection_caches,
3338            )?;
3339        }
3340        NdjsonDirectStreamMap::Object(fields) => {
3341            write_json_tape_object_projection_from(
3342                writer,
3343                tape,
3344                item_idx,
3345                fields,
3346                projection_caches,
3347            )?;
3348        }
3349    }
3350    Ok(())
3351}
3352fn write_json_tape_object_projection<W: Write, T: JsonTape>(
3353    writer: &mut W,
3354    tape: &T,
3355    fields: &[super::ndjson_direct::NdjsonDirectObjectField],
3356    path_caches: &mut Vec<NdjsonPathCache>,
3357) -> Result<(), JetroEngineError> {
3358    write_json_tape_object_projection_from(writer, tape, 0, fields, path_caches)
3359}
3360fn write_json_tape_object_projection_from<W: Write, T: JsonTape>(
3361    writer: &mut W,
3362    tape: &T,
3363    start: usize,
3364    fields: &[super::ndjson_direct::NdjsonDirectObjectField],
3365    path_caches: &mut Vec<NdjsonPathCache>,
3366) -> Result<(), JetroEngineError> {
3367    if path_caches.len() < fields.len() {
3368        path_caches.resize_with(fields.len(), NdjsonPathCache::default);
3369    }
3370    writer.write_all(b"{")?;
3371    let mut wrote = false;
3372    for (field_idx, field) in fields.iter().enumerate() {
3373        let path_cache = &mut path_caches[field_idx];
3374        let mut path_idx = None;
3375        match &field.value {
3376            NdjsonDirectProjectionValue::Path(steps) => {
3377                let idx = path_cache.index(tape, start, steps);
3378                path_idx = idx;
3379                if field.optional
3380                    && idx
3381                        .map(|idx| {
3382                            matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
3383                        })
3384                        .unwrap_or(true)
3385                {
3386                    continue;
3387                }
3388            }
3389            NdjsonDirectProjectionValue::ViewScalarCall {
3390                steps,
3391                call,
3392                optional,
3393            } => {
3394                let idx = path_cache.index(tape, start, steps);
3395                path_idx = idx;
3396                if (*optional || field.optional)
3397                    && idx
3398                        .map(|idx| {
3399                            matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
3400                        })
3401                        .unwrap_or(true)
3402                {
3403                    continue;
3404                }
3405                if field.optional
3406                    && idx
3407                        .map(|idx| json_tape_scalar(tape, idx))
3408                        .and_then(|value| call.try_apply_json_view(value))
3409                        .is_some_and(|value| matches!(value, Val::Null))
3410                {
3411                    continue;
3412                }
3413            }
3414            NdjsonDirectProjectionValue::Literal(Val::Null) if field.optional => {
3415                continue;
3416            }
3417            NdjsonDirectProjectionValue::Nested(_) => {}
3418            NdjsonDirectProjectionValue::Literal(_) => {}
3419        }
3420        if wrote {
3421            writer.write_all(b",")?;
3422        }
3423        write_json_str(writer, field.key.as_ref())?;
3424        writer.write_all(b":")?;
3425        write_json_tape_direct_value(writer, tape, &field.value, path_idx)?;
3426        wrote = true;
3427    }
3428    writer.write_all(b"}")?;
3429    Ok(())
3430}
3431fn write_json_tape_array_projection<W: Write, T: JsonTape>(
3432    writer: &mut W,
3433    tape: &T,
3434    items: &[NdjsonDirectProjectionValue],
3435    path_caches: &mut Vec<NdjsonPathCache>,
3436) -> Result<(), JetroEngineError> {
3437    write_json_tape_array_projection_from(writer, tape, 0, items, path_caches)
3438}
3439fn write_json_tape_array_projection_from<W: Write, T: JsonTape>(
3440    writer: &mut W,
3441    tape: &T,
3442    start: usize,
3443    items: &[NdjsonDirectProjectionValue],
3444    path_caches: &mut Vec<NdjsonPathCache>,
3445) -> Result<(), JetroEngineError> {
3446    if path_caches.len() < items.len() {
3447        path_caches.resize_with(items.len(), NdjsonPathCache::default);
3448    }
3449    writer.write_all(b"[")?;
3450    for (idx, item) in items.iter().enumerate() {
3451        if idx > 0 {
3452            writer.write_all(b",")?;
3453        }
3454        let path_idx = match item {
3455            NdjsonDirectProjectionValue::Path(steps)
3456            | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3457                path_caches[idx].index(tape, start, steps)
3458            }
3459            NdjsonDirectProjectionValue::Nested(_) => None,
3460            NdjsonDirectProjectionValue::Literal(_) => None,
3461        };
3462        write_json_tape_direct_value(writer, tape, item, path_idx)?;
3463    }
3464    writer.write_all(b"]")?;
3465    Ok(())
3466}
3467fn write_json_tape_direct_value<W: Write, T: JsonTape>(
3468    writer: &mut W,
3469    tape: &T,
3470    value: &NdjsonDirectProjectionValue,
3471    path_idx: Option<usize>,
3472) -> Result<(), JetroEngineError> {
3473    match value {
3474        NdjsonDirectProjectionValue::Path(_) => {
3475            if let Some(idx) = path_idx {
3476                write_json_tape_at(writer, tape, idx)?;
3477            } else {
3478                writer.write_all(b"null")?;
3479            }
3480        }
3481        NdjsonDirectProjectionValue::ViewScalarCall { call, .. } => {
3482            if let Some(idx) = path_idx {
3483                let value = json_tape_scalar(tape, idx);
3484                if let Some(value) = call.try_apply_json_view(value) {
3485                    write_val_json(writer, &value)?;
3486                } else {
3487                    write_json_tape_at(writer, tape, idx)?;
3488                }
3489            } else {
3490                writer.write_all(b"null")?;
3491            }
3492        }
3493        NdjsonDirectProjectionValue::Literal(value) => write_val_json(writer, value)?,
3494        NdjsonDirectProjectionValue::Nested(plan) => {
3495            write_json_tape_nested_plan(writer, tape, plan)?;
3496        }
3497    }
3498    Ok(())
3499}
3500fn write_json_tape_nested_plan<W: Write, T: JsonTape>(
3501    writer: &mut W,
3502    tape: &T,
3503    plan: &NdjsonDirectTapePlan,
3504) -> Result<(), JetroEngineError> {
3505    let mut root_cache = NdjsonPathCache::default();
3506    let mut source_cache = NdjsonPathCache::default();
3507    let mut suffix_cache = NdjsonPathCache::default();
3508    let mut predicate_cache = NdjsonPathCache::default();
3509    let mut projection_caches = Vec::new();
3510    match plan {
3511        NdjsonDirectTapePlan::RootPath(steps) => {
3512            if let Some(idx) = root_cache.index(tape, 0, steps) {
3513                write_json_tape_at(writer, tape, idx)?;
3514            } else {
3515                writer.write_all(b"null")?;
3516            }
3517        }
3518        NdjsonDirectTapePlan::ViewScalarCall {
3519            steps,
3520            call,
3521            optional,
3522        } => {
3523            let idx = root_cache.index(tape, 0, steps);
3524            let value = idx
3525                .map(|idx| json_tape_scalar(tape, idx))
3526                .unwrap_or(crate::util::JsonView::Null);
3527            if *optional && matches!(value, crate::util::JsonView::Null) {
3528                writer.write_all(b"null")?;
3529            } else if let Some(value) = call.try_apply_json_view(value) {
3530                write_val_json(writer, &value)?;
3531            } else if let Some(idx) = idx {
3532                write_json_tape_at(writer, tape, idx)?;
3533            } else {
3534                writer.write_all(b"null")?;
3535            }
3536        }
3537        NdjsonDirectTapePlan::ArrayElementPath {
3538            source_steps,
3539            element,
3540            suffix_steps,
3541        } => {
3542            write_json_tape_array_element_path(
3543                writer,
3544                tape,
3545                source_steps,
3546                *element,
3547                suffix_steps,
3548                &mut source_cache,
3549                &mut suffix_cache,
3550            )?;
3551        }
3552        NdjsonDirectTapePlan::ArrayElementViewScalarCall {
3553            source_steps,
3554            element,
3555            suffix_steps,
3556            call,
3557        } => {
3558            write_json_tape_array_element_scalar(
3559                writer,
3560                tape,
3561                source_steps,
3562                *element,
3563                suffix_steps,
3564                call,
3565                &mut source_cache,
3566                &mut suffix_cache,
3567            )?;
3568        }
3569        NdjsonDirectTapePlan::Stream(stream) => {
3570            write_json_tape_stream(
3571                writer,
3572                tape,
3573                stream,
3574                &mut source_cache,
3575                &mut suffix_cache,
3576                &mut predicate_cache,
3577                &mut projection_caches,
3578            )?;
3579        }
3580        NdjsonDirectTapePlan::Object(fields) => {
3581            write_json_tape_object_projection(writer, tape, fields, &mut projection_caches)?;
3582        }
3583        NdjsonDirectTapePlan::Array(items) => {
3584            write_json_tape_array_projection(writer, tape, items, &mut projection_caches)?;
3585        }
3586        NdjsonDirectTapePlan::ObjectItems { steps, method } => {
3587            let idx = root_cache.index(tape, 0, steps);
3588            write_json_tape_object_items(writer, tape, idx, *method)?;
3589        }
3590        NdjsonDirectTapePlan::ViewPipeline { .. } => {
3591            writer.write_all(b"null")?;
3592        }
3593    }
3594    Ok(())
3595}
3596fn write_json_tape_array_element_path<W: Write, T: JsonTape>(
3597    writer: &mut W,
3598    tape: &T,
3599    source_steps: &[crate::ir::physical::PhysicalPathStep],
3600    element: super::ndjson_direct::NdjsonDirectElement,
3601    suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3602    source_cache: &mut NdjsonPathCache,
3603    suffix_cache: &mut NdjsonPathCache,
3604) -> Result<(), JetroEngineError> {
3605    let idx = source_cache
3606        .index(tape, 0, source_steps)
3607        .and_then(|idx| json_tape_array_element(tape, idx, element))
3608        .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3609    if let Some(idx) = idx {
3610        write_json_tape_at(writer, tape, idx)?;
3611    } else {
3612        writer.write_all(b"null")?;
3613    }
3614    Ok(())
3615}
3616fn write_json_tape_array_element_scalar<W: Write, T: JsonTape>(
3617    writer: &mut W,
3618    tape: &T,
3619    source_steps: &[crate::ir::physical::PhysicalPathStep],
3620    element: super::ndjson_direct::NdjsonDirectElement,
3621    suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3622    call: &crate::builtins::BuiltinCall,
3623    source_cache: &mut NdjsonPathCache,
3624    suffix_cache: &mut NdjsonPathCache,
3625) -> Result<(), JetroEngineError> {
3626    let idx = source_cache
3627        .index(tape, 0, source_steps)
3628        .and_then(|idx| json_tape_array_element(tape, idx, element))
3629        .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3630    if let Some(value) = idx
3631        .map(|idx| json_tape_scalar(tape, idx))
3632        .and_then(|value| call.try_apply_json_view(value))
3633    {
3634        write_val_json(writer, &value)?;
3635    } else if let Some(idx) = idx {
3636        write_json_tape_at(writer, tape, idx)?;
3637    } else {
3638        writer.write_all(b"null")?;
3639    }
3640    Ok(())
3641}
3642fn write_json_tape_object_items<W: Write, T: JsonTape>(
3643    writer: &mut W,
3644    tape: &T,
3645    obj_idx: Option<usize>,
3646    method: crate::builtins::BuiltinMethod,
3647) -> Result<(), JetroEngineError> {
3648    let Some(obj_idx) = obj_idx else {
3649        writer.write_all(b"[]")?;
3650        return Ok(());
3651    };
3652    let Some(crate::data::tape::TapeNode::Object { len, .. }) = tape.nodes().get(obj_idx).copied()
3653    else {
3654        writer.write_all(b"[]")?;
3655        return Ok(());
3656    };
3657
3658    writer.write_all(b"[")?;
3659    let mut cur = obj_idx + 1;
3660    for field_idx in 0..len {
3661        if field_idx > 0 {
3662            writer.write_all(b",")?;
3663        }
3664        match method {
3665            crate::builtins::BuiltinMethod::Keys => {
3666                write_json_str(writer, tape.str_at(cur))?;
3667                cur += 1;
3668                cur += tape.span(cur);
3669            }
3670            crate::builtins::BuiltinMethod::Values => {
3671                cur = write_json_tape_at(writer, tape, cur + 1)?;
3672            }
3673            crate::builtins::BuiltinMethod::Entries => {
3674                writer.write_all(b"[")?;
3675                write_json_str(writer, tape.str_at(cur))?;
3676                writer.write_all(b",")?;
3677                cur = write_json_tape_at(writer, tape, cur + 1)?;
3678                writer.write_all(b"]")?;
3679            }
3680            _ => unreachable!("non-object-items builtin"),
3681        }
3682    }
3683    writer.write_all(b"]")?;
3684    Ok(())
3685}
3686fn reduce_json_tape_numeric_path<T: JsonTape>(
3687    tape: &T,
3688    source_steps: &[crate::ir::physical::PhysicalPathStep],
3689    predicate: Option<&NdjsonDirectItemPredicate>,
3690    suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3691    op: crate::exec::pipeline::NumOp,
3692    caches: NdjsonPathCaches<'_>,
3693) -> Val {
3694    let mut acc_i = 0i64;
3695    let mut acc_f = 0.0f64;
3696    let mut floated = false;
3697    let mut min_f = f64::INFINITY;
3698    let mut max_f = f64::NEG_INFINITY;
3699    let mut n_obs = 0usize;
3700
3701    let Some(source_idx) = caches.source.index(tape, 0, source_steps) else {
3702        return crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs);
3703    };
3704
3705    let suffix_cache = caches.suffix;
3706    let predicate_cache = caches.predicate;
3707    let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
3708        if !predicate.is_none_or(|predicate| {
3709            eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3710        }) {
3711            return Ok(());
3712        }
3713        if let Some(idx) = suffix_cache.index(tape, item_idx, suffix_steps) {
3714            fold_json_tape_numeric(
3715                json_tape_scalar(tape, idx),
3716                op,
3717                &mut acc_i,
3718                &mut acc_f,
3719                &mut floated,
3720                &mut min_f,
3721                &mut max_f,
3722                &mut n_obs,
3723            );
3724        }
3725        Ok(())
3726    });
3727
3728    crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs)
3729}
3730#[allow(clippy::too_many_arguments)]
3731fn fold_json_tape_numeric(
3732    value: crate::util::JsonView<'_>,
3733    op: crate::exec::pipeline::NumOp,
3734    acc_i: &mut i64,
3735    acc_f: &mut f64,
3736    floated: &mut bool,
3737    min_f: &mut f64,
3738    max_f: &mut f64,
3739    n_obs: &mut usize,
3740) {
3741    match value {
3742        crate::util::JsonView::Int(value) => crate::exec::pipeline::num_fold_i64(
3743            acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3744        ),
3745        crate::util::JsonView::UInt(value) if value <= i64::MAX as u64 => {
3746            crate::exec::pipeline::num_fold_i64(
3747                acc_i,
3748                acc_f,
3749                floated,
3750                min_f,
3751                max_f,
3752                n_obs,
3753                op,
3754                value as i64,
3755            )
3756        }
3757        crate::util::JsonView::UInt(value) => crate::exec::pipeline::num_fold_f64(
3758            acc_i,
3759            acc_f,
3760            floated,
3761            min_f,
3762            max_f,
3763            n_obs,
3764            op,
3765            value as f64,
3766        ),
3767        crate::util::JsonView::Float(value) => crate::exec::pipeline::num_fold_f64(
3768            acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3769        ),
3770        _ => {}
3771    }
3772}
3773fn eval_json_tape_item_predicate_cached<T: JsonTape>(
3774    tape: &T,
3775    item_idx: usize,
3776    predicate: &NdjsonDirectItemPredicate,
3777    cache: &mut NdjsonPathCache,
3778) -> bool {
3779    use crate::parse::ast::BinOp;
3780
3781    match predicate {
3782        NdjsonDirectItemPredicate::Path(steps) => cache
3783            .index(tape, item_idx, steps)
3784            .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
3785            .unwrap_or(false),
3786        NdjsonDirectItemPredicate::Literal(value) => crate::util::is_truthy(value),
3787        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
3788            eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3789                && eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3790        }
3791        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
3792            eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3793                || eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3794        }
3795        NdjsonDirectItemPredicate::Binary { lhs, op, rhs } => {
3796            let Some(lhs) = eval_json_tape_item_scalar_cached(tape, item_idx, lhs, cache) else {
3797                return false;
3798            };
3799            let Some(rhs) = eval_json_tape_item_scalar_cached(tape, item_idx, rhs, cache) else {
3800                return false;
3801            };
3802            crate::util::json_cmp_binop(lhs, *op, rhs)
3803        }
3804        NdjsonDirectItemPredicate::CmpLit { lhs, op, lit } => cache
3805            .index(tape, item_idx, lhs)
3806            .map(|idx| json_tape_scalar(tape, idx))
3807            .is_some_and(|value| {
3808                crate::util::json_cmp_binop(value, *op, crate::util::JsonView::from_val(lit))
3809            }),
3810        NdjsonDirectItemPredicate::ViewScalarCall { suffix_steps, call } => cache
3811            .index(tape, item_idx, suffix_steps)
3812            .map(|idx| json_tape_scalar(tape, idx))
3813            .and_then(|value| call.try_apply_json_view(value))
3814            .is_some_and(|value| crate::util::is_truthy(&value)),
3815    }
3816}
3817fn eval_json_tape_item_scalar_cached<'a, T: JsonTape>(
3818    tape: &'a T,
3819    item_idx: usize,
3820    predicate: &'a NdjsonDirectItemPredicate,
3821    cache: &mut NdjsonPathCache,
3822) -> Option<crate::util::JsonView<'a>> {
3823    match predicate {
3824        NdjsonDirectItemPredicate::Path(steps) => cache
3825            .index(tape, item_idx, steps)
3826            .map(|idx| json_tape_scalar(tape, idx)),
3827        NdjsonDirectItemPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
3828        _ => None,
3829    }
3830}
3831
3832fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3833where
3834    W: Write,
3835    I: IntoIterator<Item = &'a Val>,
3836{
3837    writer.write_all(b"[")?;
3838    let mut first = true;
3839    for item in items {
3840        if first {
3841            first = false;
3842        } else {
3843            writer.write_all(b",")?;
3844        }
3845        write_val_json(writer, item)?;
3846    }
3847    writer.write_all(b"]")?;
3848    Ok(())
3849}
3850
3851fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3852where
3853    W: Write,
3854    I: IntoIterator<Item = i64>,
3855{
3856    writer.write_all(b"[")?;
3857    let mut first = true;
3858    let mut buf = itoa::Buffer::new();
3859    for item in items {
3860        if first {
3861            first = false;
3862        } else {
3863            writer.write_all(b",")?;
3864        }
3865        writer.write_all(buf.format(item).as_bytes())?;
3866    }
3867    writer.write_all(b"]")?;
3868    Ok(())
3869}
3870
3871fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3872where
3873    W: Write,
3874    I: IntoIterator<Item = f64>,
3875{
3876    writer.write_all(b"[")?;
3877    let mut first = true;
3878    let mut buf = ryu::Buffer::new();
3879    for item in items {
3880        if first {
3881            first = false;
3882        } else {
3883            writer.write_all(b",")?;
3884        }
3885        if item.is_finite() {
3886            writer.write_all(buf.format(item).as_bytes())?;
3887        } else {
3888            writer.write_all(b"0")?;
3889        }
3890    }
3891    writer.write_all(b"]")?;
3892    Ok(())
3893}
3894
3895fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3896where
3897    W: Write,
3898    I: IntoIterator<Item = &'a str>,
3899{
3900    writer.write_all(b"[")?;
3901    let mut first = true;
3902    for item in items {
3903        if first {
3904            first = false;
3905        } else {
3906            writer.write_all(b",")?;
3907        }
3908        write_json_str(writer, item)?;
3909    }
3910    writer.write_all(b"]")?;
3911    Ok(())
3912}
3913
3914fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
3915where
3916    W: Write,
3917    I: IntoIterator<Item = (&'a str, &'a Val)>,
3918{
3919    writer.write_all(b"{")?;
3920    let mut first = true;
3921    for (key, value) in entries {
3922        if first {
3923            first = false;
3924        } else {
3925            writer.write_all(b",")?;
3926        }
3927        write_json_str(writer, key)?;
3928        writer.write_all(b":")?;
3929        write_val_json(writer, value)?;
3930    }
3931    writer.write_all(b"}")?;
3932    Ok(())
3933}
3934
3935fn write_json_objvec<W: Write>(
3936    writer: &mut W,
3937    data: &crate::data::value::ObjVecData,
3938) -> Result<(), JetroEngineError> {
3939    writer.write_all(b"[")?;
3940    for row in 0..data.nrows() {
3941        if row > 0 {
3942            writer.write_all(b",")?;
3943        }
3944        writer.write_all(b"{")?;
3945        for slot in 0..data.stride() {
3946            if slot > 0 {
3947                writer.write_all(b",")?;
3948            }
3949            write_json_str(writer, data.keys[slot].as_ref())?;
3950            writer.write_all(b":")?;
3951            write_val_json(writer, data.cell(row, slot))?;
3952        }
3953        writer.write_all(b"}")?;
3954    }
3955    writer.write_all(b"]")?;
3956    Ok(())
3957}
3958
3959pub(super) fn write_json_str<W: Write>(
3960    writer: &mut W,
3961    value: &str,
3962) -> Result<(), JetroEngineError> {
3963    writer.write_all(b"\"")?;
3964    let bytes = value.as_bytes();
3965    if !needs_json_escape(bytes) {
3966        writer.write_all(bytes)?;
3967        writer.write_all(b"\"")?;
3968        return Ok(());
3969    }
3970
3971    let mut start = 0usize;
3972
3973    for (idx, &byte) in bytes.iter().enumerate() {
3974        let escaped = match byte {
3975            b'"' => Some(br#"\""#.as_slice()),
3976            b'\\' => Some(br#"\\"#.as_slice()),
3977            b'\n' => Some(br#"\n"#.as_slice()),
3978            b'\r' => Some(br#"\r"#.as_slice()),
3979            b'\t' => Some(br#"\t"#.as_slice()),
3980            0x08 => Some(br#"\b"#.as_slice()),
3981            0x0c => Some(br#"\f"#.as_slice()),
3982            0x00..=0x1f => None,
3983            _ => continue,
3984        };
3985
3986        if start < idx {
3987            writer.write_all(&bytes[start..idx])?;
3988        }
3989        match escaped {
3990            Some(seq) => writer.write_all(seq)?,
3991            None => write_control_escape(writer, byte)?,
3992        }
3993        start = idx + 1;
3994    }
3995
3996    if start < bytes.len() {
3997        writer.write_all(&bytes[start..])?;
3998    }
3999    writer.write_all(b"\"")?;
4000    Ok(())
4001}
4002
4003#[inline]
4004pub(super) fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
4005    let mut buf = itoa::Buffer::new();
4006    writer.write_all(buf.format(value).as_bytes())?;
4007    Ok(())
4008}
4009
4010#[inline]
4011fn write_u64<W: Write>(writer: &mut W, value: u64) -> Result<(), JetroEngineError> {
4012    let mut buf = itoa::Buffer::new();
4013    writer.write_all(buf.format(value).as_bytes())?;
4014    Ok(())
4015}
4016
4017#[inline]
4018fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
4019    if value.is_finite() {
4020        let mut buf = ryu::Buffer::new();
4021        writer.write_all(buf.format(value).as_bytes())?;
4022    } else {
4023        writer.write_all(b"0")?;
4024    }
4025    Ok(())
4026}
4027
4028#[inline]
4029fn needs_json_escape(bytes: &[u8]) -> bool {
4030    bytes
4031        .iter()
4032        .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
4033}
4034
4035fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
4036    const HEX: &[u8; 16] = b"0123456789abcdef";
4037    writer.write_all(&[
4038        b'\\',
4039        b'u',
4040        b'0',
4041        b'0',
4042        HEX[(byte >> 4) as usize],
4043        HEX[(byte & 0x0f) as usize],
4044    ])?;
4045    Ok(())
4046}
4047
4048pub(super) fn trim_line_ending(buf: &mut Vec<u8>) {
4049    while matches!(buf.last(), Some(b'\n' | b'\r')) {
4050        buf.pop();
4051    }
4052}
4053
4054pub(super) fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
4055    if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
4056        buf.drain(..3);
4057    }
4058}
4059
4060pub(super) fn non_ws_range(buf: &[u8]) -> (usize, usize) {
4061    let start = buf
4062        .iter()
4063        .position(|b| !b.is_ascii_whitespace())
4064        .unwrap_or(buf.len());
4065    let end = buf
4066        .iter()
4067        .rposition(|b| !b.is_ascii_whitespace())
4068        .map(|idx| idx + 1)
4069        .unwrap_or(start);
4070    (start, end)
4071}
4072
4073#[cfg(test)]
4074mod tests {
4075    #[test]
4076    fn rows_stream_driver_reports_direct_stage_stats() {
4077        let engine = crate::JetroEngine::new();
4078        let plan = super::super::ndjson_rows::ndjson_rows_stream_plan(
4079            "$.rows().filter($.active == true).distinct_by($.id).take(2).map($.id)",
4080        )
4081        .unwrap()
4082        .unwrap();
4083        let input = std::io::Cursor::new(
4084            br#"{"id":"a","active":false}
4085{"id":"a","active":true}
4086{"id":"b","active":true}
4087not-json
4088"#
4089            .to_vec(),
4090        );
4091        let mut out = Vec::new();
4092
4093        let (emitted, stats) = super::drive_ndjson_rows_stream_reader_with_stats(
4094            &engine,
4095            input,
4096            &plan,
4097            None,
4098            super::NdjsonOptions::default(),
4099            &mut out,
4100        )
4101        .unwrap();
4102
4103        assert_eq!(emitted, 2);
4104        assert_eq!(String::from_utf8(out).unwrap(), "\"a\"\n\"b\"\n");
4105        assert_eq!(stats.source, super::RowStreamSourceKind::NdjsonRows);
4106        assert_eq!(stats.direction, super::RowStreamDirection::Forward);
4107        assert_eq!(stats.rows_scanned, 3);
4108        assert_eq!(stats.rows_filtered, 1);
4109        assert_eq!(stats.rows_emitted, 2);
4110        assert_eq!(stats.direct_filter_rows, 3);
4111        assert_eq!(stats.direct_key_rows, 2);
4112        assert_eq!(stats.direct_project_rows, 2);
4113    }
4114
4115    #[test]
4116    fn rows_stream_driver_filters_array_find_on_byte_predicate() {
4117        let engine = crate::JetroEngine::new();
4118        let plan = super::super::ndjson_rows::ndjson_rows_stream_plan(
4119            r#"$.rows().filter(@.custom_attributes.find(@.value == "z")).map($.id)"#,
4120        )
4121        .unwrap()
4122        .unwrap();
4123        let input = std::io::Cursor::new(
4124            br#"{"id":"a","custom_attributes":[{"value":"x"}]}
4125{"id":"b","custom_attributes":[{"value":"z"}]}
4126{"id":"c","custom_attributes":[{"value":null}]}
4127"#
4128            .to_vec(),
4129        );
4130        let mut out = Vec::new();
4131
4132        let (emitted, stats) = super::drive_ndjson_rows_stream_reader_with_stats(
4133            &engine,
4134            input,
4135            &plan,
4136            None,
4137            super::NdjsonOptions::default(),
4138            &mut out,
4139        )
4140        .unwrap();
4141
4142        assert_eq!(emitted, 1);
4143        assert_eq!(String::from_utf8(out).unwrap(), "\"b\"\n");
4144        assert_eq!(stats.rows_scanned, 3);
4145        assert_eq!(stats.rows_filtered, 2);
4146        assert_eq!(stats.direct_filter_rows, 3);
4147        assert_eq!(stats.fallback_filter_rows, 0);
4148    }
4149
4150    #[test]
4151    fn parse_row_keeps_simd_document_lazy() {
4152        let engine = crate::JetroEngine::new();
4153        let row = br#"{"name":"Ada","age":30}"#.to_vec();
4154
4155        let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
4156
4157        assert!(!document.root_val_is_materialized());
4158        assert!(!document.tape_is_built());
4159    }
4160
4161    #[test]
4162    fn owned_row_read_preserves_reusable_buffer_capacity() {
4163        let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
4164        let mut driver = super::NdjsonPerRowDriver::new(input);
4165        let mut buf = Vec::with_capacity(128);
4166
4167        let first = driver
4168            .read_next_owned(&mut buf)
4169            .expect("row read succeeds")
4170            .expect("first row exists");
4171        assert_eq!(first.1, br#"{"n":1}"#);
4172        assert_eq!(buf.capacity(), 128);
4173
4174        let second = driver
4175            .read_next_owned(&mut buf)
4176            .expect("row read succeeds")
4177            .expect("second row exists");
4178        assert_eq!(second.1, br#"{"n":2}"#);
4179        assert_eq!(buf.capacity(), 128);
4180    }
4181
4182    #[test]
4183    fn direct_tape_plan_accepts_first_suffix() {
4184        let engine = crate::JetroEngine::new();
4185        for query in [
4186            "attributes.first().value",
4187            "attributes.last().value",
4188            "attributes.nth(1).value",
4189        ] {
4190            let plan =
4191                super::direct_tape_plan(&engine, query).expect("array suffix should be direct");
4192            assert!(matches!(
4193                plan,
4194                super::NdjsonDirectTapePlan::ArrayElementPath { .. }
4195            ));
4196        }
4197    }
4198
4199    #[test]
4200    fn direct_tape_plan_accepts_rooted_bench_shapes() {
4201        let engine = crate::JetroEngine::new();
4202        for query in [
4203            "$.id",
4204            "$.a.b.c",
4205            "$.meta.id",
4206            "$.name",
4207            "$.attributes.len()",
4208            "$.store.attributes.len()",
4209            "$.attributes.map(@.key)",
4210            "$.attributes.first().value",
4211            "$.store.attributes.first().value",
4212            "$.attributes.last().value",
4213            "$.name.upper()",
4214            "$.store.name.upper()",
4215            "$.attributes.map([@.key, @.value])",
4216            r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4217            "$.keys()",
4218        ] {
4219            super::direct_tape_plan(&engine, query)
4220                .unwrap_or_else(|| panic!("{query} should have a direct NDJSON tape plan"));
4221        }
4222    }
4223
4224    #[test]
4225    fn direct_writer_plan_kind_exposes_hot_path_selection() {
4226        let engine = crate::JetroEngine::new();
4227        use super::NdjsonDirectPlanKind::{
4228            ByteExpr, TapeArrayProjection, TapeObjectProjection, TapeRootPath, TapeStreamCollect,
4229            TapeStreamCount, TapeStreamExtreme, TapeStreamFirst, TapeStreamLast, TapeStreamNumeric,
4230        };
4231
4232        for (query, expected) in [
4233            ("$.name", (Some(ByteExpr), TapeRootPath)),
4234            ("$.a.b.c", (Some(ByteExpr), TapeRootPath)),
4235            (r#"{test: $.a.b.c, b: $.a.b}"#, (None, TapeObjectProjection)),
4236            (r#"[$.id, $.name]"#, (None, TapeArrayProjection)),
4237            ("$.attributes.map(@.key)", (None, TapeStreamCollect)),
4238            (
4239                "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4240                (None, TapeStreamCollect),
4241            ),
4242            (
4243                r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4244                (None, TapeStreamFirst),
4245            ),
4246            ("$.attributes.map(@.value).last()", (None, TapeStreamLast)),
4247            (
4248                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4249                (None, TapeStreamCount),
4250            ),
4251            (
4252                "$.attributes.map(@.weight).sum()",
4253                (None, TapeStreamNumeric),
4254            ),
4255            (
4256                r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4257                (None, TapeStreamNumeric),
4258            ),
4259            (
4260                "$.attributes.sort_by(@.value).last().key",
4261                (None, TapeStreamExtreme),
4262            ),
4263        ] {
4264            let actual = super::direct_writer_plan_kind(&engine, query)
4265                .unwrap_or_else(|| panic!("{query} should have an observable direct plan"));
4266            assert_eq!(actual, expected, "{query}");
4267        }
4268    }
4269
4270    #[test]
4271    fn direct_writer_path_kind_matches_runtime_writer_family() {
4272        let engine = crate::JetroEngine::new();
4273        use super::NdjsonWriterPathKind::{ByteExpr, ByteWritableTape};
4274
4275        for (query, expected) in [
4276            ("$.name", ByteExpr),
4277            ("$.a.b.c", ByteExpr),
4278            (r#"{test: $.a.b.c, b: $.a.b}"#, ByteWritableTape),
4279            (r#"[$.id, $.name]"#, ByteWritableTape),
4280            ("$.attributes.map(@.key)", ByteWritableTape),
4281            (
4282                "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4283                ByteWritableTape,
4284            ),
4285            (
4286                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4287                ByteWritableTape,
4288            ),
4289            ("$.attributes.map(@.weight).sum()", ByteWritableTape),
4290            (
4291                r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4292                ByteWritableTape,
4293            ),
4294            (
4295                r#"{id: $.id, name: $.name, count: $.attributes.len()}"#,
4296                ByteWritableTape,
4297            ),
4298            (
4299                r#"[$.id, $.name, $.attributes.first().value, $.attributes.last().value]"#,
4300                ByteWritableTape,
4301            ),
4302            (
4303                r#"{name_upper: $.name.upper(), values: $.attributes.map(@.value), last: $.attributes.last().value}"#,
4304                ByteWritableTape,
4305            ),
4306            ("$.attributes.sort_by(@.value).last().key", ByteWritableTape),
4307        ] {
4308            assert_eq!(
4309                super::direct_writer_path_kind(&engine, query),
4310                Some(expected),
4311                "{query}"
4312            );
4313        }
4314    }
4315
4316    #[test]
4317    fn public_writer_path_kind_reports_direct_family() {
4318        let engine = crate::JetroEngine::new();
4319        let kind = crate::io::ndjson_writer_path_kind(&engine, "$.name").unwrap();
4320        assert_eq!(kind, super::NdjsonWriterPathKind::ByteExpr);
4321        assert_eq!(kind.to_string(), "byte-expr");
4322    }
4323
4324    #[test]
4325    fn direct_tape_plan_lowers_stream_shapes_generically() {
4326        let engine = crate::JetroEngine::new();
4327        for query in [
4328            "$.attributes.map(@.key)",
4329            "$.attributes.map(@.key.upper())",
4330            "$.attributes.map(@.value).first()",
4331            "$.attributes.map(@.value).last()",
4332            r#"$.attributes.filter(@.value.contains("_3")).map(@.key)"#,
4333            r#"$.attributes.filter(@.value.contains("_3")).map(@.key.upper())"#,
4334            r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4335            r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4336            "$.attributes.map(@.weight).sum()",
4337            r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4338            "$.attributes.sort_by(@.value).last().key",
4339        ] {
4340            let plan =
4341                super::direct_tape_plan(&engine, query).expect("query should be direct NDJSON");
4342            assert!(
4343                matches!(plan, super::NdjsonDirectTapePlan::Stream(_)),
4344                "{query} should lower to a generic NDJSON stream plan"
4345            );
4346        }
4347    }
4348
4349    #[test]
4350    fn direct_byte_plan_accepts_fast_root_shapes() {
4351        let engine = crate::JetroEngine::new();
4352        for query in [
4353            "$.id",
4354            "$.name",
4355            "$.name.upper()",
4356            "$.name.lower()",
4357            "$.keys()",
4358            "$.meta.keys()",
4359            "$.values()",
4360            "$.entries()",
4361            "$.attributes.first().value",
4362            "$.store.attributes.first().value",
4363            "$.attributes.first().key.upper()",
4364            "$.attributes.last().value",
4365            "$.attributes.nth(1).value",
4366        ] {
4367            super::direct_byte_plan(&engine, query)
4368                .unwrap_or_else(|| panic!("{query} should have a direct NDJSON byte plan"));
4369        }
4370    }
4371
4372    #[test]
4373    fn direct_byte_predicates_cover_match_shapes() {
4374        let engine = crate::JetroEngine::new();
4375        let row = br#"{"active":true,"score":9910,"attributes":[{"key":"k1","value":"v_1"}]}"#;
4376        for predicate in [
4377            ("active", true),
4378            ("score > 9900", true),
4379            ("score < 100", false),
4380            (r#"attributes.first().value.contains("_1")"#, true),
4381        ] {
4382            let plan = super::direct_tape_predicate(&engine, predicate.0)
4383                .unwrap_or_else(|| panic!("{} should have a direct predicate", predicate.0));
4384            let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
4385                .expect("byte predicate should evaluate")
4386                .unwrap_or_else(|| panic!("{} should not need tape fallback", predicate.0));
4387            assert_eq!(matched, predicate.1, "{}", predicate.0);
4388        }
4389    }
4390
4391    #[test]
4392    fn direct_byte_predicate_covers_array_find_field_comparison() {
4393        let row = br#"{"custom_attributes":[{"attribute_name":"a","value":"x"},{"attribute_name":"b","value":"z"},{"attribute_name":"c","value":null},{"attribute_name":"d","value":""}]}"#;
4394        for (predicate, expected) in [
4395            (r#"@.custom_attributes.find(@.value == "z")"#, true),
4396            (r#"@.custom_attributes.find(value == "missing")"#, false),
4397            (r#"@.custom_attributes.find(@.value == null)"#, true),
4398            (r#"@.custom_attributes.find(@.value == "")"#, true),
4399        ] {
4400            let expr = crate::parse::parser::parse(predicate).expect("parse");
4401            let plan = super::super::ndjson_direct::direct_tape_predicate_for_expr(&expr)
4402                .unwrap_or_else(|| panic!("{predicate} should have a direct predicate"));
4403            let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
4404                .expect("byte predicate should evaluate")
4405                .unwrap_or_else(|| panic!("{predicate} should not need tape fallback"));
4406            assert_eq!(matched, expected, "{predicate}");
4407        }
4408    }
4409
4410    #[test]
4411    fn direct_byte_tape_plan_counts_filtered_rows() {
4412        let engine = crate::JetroEngine::new();
4413        let query = r#"attributes.filter(@.value.contains("_3")).len()"#;
4414        let plan = super::direct_tape_plan(&engine, query).expect("filter count should be direct");
4415        assert!(super::tape_plan_can_write_byte_row(&plan));
4416
4417        let row = br#"{"attributes":[{"value":"a_3"},{"value":"b"},{"value":"c_3"}]}"#;
4418        let mut out = Vec::new();
4419        let mut scratch = Vec::new();
4420        let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4421            .expect("byte count should write");
4422        assert!(matches!(wrote, super::BytePlanWrite::Done));
4423        assert_eq!(out, b"2");
4424    }
4425
4426    #[test]
4427    fn direct_byte_tape_plan_reduces_numeric_streams() {
4428        let engine = crate::JetroEngine::new();
4429        let row = br#"{"attributes":[{"weight":1},{"weight":2.5},{"weight":3},{"weight":"skip"}]}"#;
4430        for (query, expected) in [
4431            ("$.attributes.map(@.weight).sum()", "6.5"),
4432            ("$.attributes.map(@.weight).avg()", "2.1666666666666665"),
4433            ("$.attributes.map(@.weight).min()", "1.0"),
4434            ("$.attributes.map(@.weight).max()", "3.0"),
4435        ] {
4436            let plan = super::direct_tape_plan(&engine, query)
4437                .unwrap_or_else(|| panic!("{query} should be direct"));
4438            assert!(
4439                super::tape_plan_can_write_byte_row(&plan),
4440                "{query} should be byte-writable"
4441            );
4442            let mut out = Vec::new();
4443            let mut scratch = Vec::new();
4444            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4445                .expect("byte numeric stream should write");
4446            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4447            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4448        }
4449    }
4450
4451    #[test]
4452    fn direct_byte_tape_plan_collects_stream_maps() {
4453        let engine = crate::JetroEngine::new();
4454        let row = br#"{"attributes":[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]}"#;
4455        for (query, expected) in [
4456            ("attributes.map(@.key)", r#"["k1","k2"]"#),
4457            (
4458                "attributes.map([@.key, @.value])",
4459                r#"[["k1","v1"],["k2","v2"]]"#,
4460            ),
4461            (
4462                "attributes.map({key: @.key, value: @.value})",
4463                r#"[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]"#,
4464            ),
4465            ("attributes.map(@.key.upper())", r#"["K1","K2"]"#),
4466            (
4467                r#"attributes.filter(@.value.contains("2")).map(@.key)"#,
4468                r#"["k2"]"#,
4469            ),
4470            (
4471                r#"attributes.filter(@.value.contains("2")).map({key: @.key, value: @.value})"#,
4472                r#"[{"key":"k2","value":"v2"}]"#,
4473            ),
4474            (
4475                r#"attributes.filter(@.key != "k1").map([@.key, @.value])"#,
4476                r#"[["k2","v2"]]"#,
4477            ),
4478        ] {
4479            let plan = super::direct_tape_plan(&engine, query)
4480                .unwrap_or_else(|| panic!("{query} should be direct"));
4481            assert!(
4482                super::tape_plan_can_write_byte_row(&plan),
4483                "{query} should be byte-writable"
4484            );
4485            let mut out = Vec::new();
4486            let mut scratch = Vec::new();
4487            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4488                .expect("byte stream should write");
4489            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4490            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4491        }
4492    }
4493
4494    #[test]
4495    fn direct_byte_tape_plan_writes_static_projections() {
4496        let engine = crate::JetroEngine::new();
4497        let row = br#"{"id":7,"a":{"b":{"c":1}}}"#;
4498        for (query, expected) in [
4499            ("$.a.b.c", "1"),
4500            (r#"{test: $.a.b.c, b: $.a.b}"#, r#"{"test":1,"b":{"c":1}}"#),
4501            (r#"[$.a.b.c, $.id]"#, r#"[1,7]"#),
4502        ] {
4503            let plan = super::direct_tape_plan(&engine, query)
4504                .unwrap_or_else(|| panic!("{query} should be direct"));
4505            assert!(
4506                super::tape_plan_can_write_byte_row(&plan),
4507                "{query} should be byte-writable"
4508            );
4509            let mut out = Vec::new();
4510            let mut scratch = Vec::new();
4511            let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4512                .expect("byte projection should write");
4513            assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4514            assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4515        }
4516    }
4517
4518    #[test]
4519    fn run_ndjson_uses_byte_paths_for_nested_object_items() {
4520        let engine = crate::JetroEngine::new();
4521        let rows = std::io::Cursor::new(
4522            br#"{"id":1}
4523{"id":2}
4524"#,
4525        );
4526        let mut out = Vec::new();
4527        engine
4528            .run_ndjson(rows, "$.id", &mut out)
4529            .expect("rooted byte path should run");
4530        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4531
4532        let rows = std::io::Cursor::new(
4533            br#"{"meta":{"id":1,"kind":"a"}}
4534{"meta":{"id":2,"kind":"b"}}
4535"#,
4536        );
4537
4538        let mut out = Vec::new();
4539        engine
4540            .run_ndjson(rows, "$.meta.id", &mut out)
4541            .expect("nested byte path should run");
4542        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4543
4544        let rows = std::io::Cursor::new(br#"{"meta":{"id":1,"kind":"a"}}"#);
4545        let mut out = Vec::new();
4546        engine
4547            .run_ndjson(rows, "$.meta.keys()", &mut out)
4548            .expect("nested byte object items should run");
4549        assert_eq!(std::str::from_utf8(&out).unwrap(), "[\"id\",\"kind\"]\n");
4550    }
4551
4552    #[test]
4553    fn run_ndjson_uses_byte_paths_for_nested_array_demands() {
4554        let engine = crate::JetroEngine::new();
4555        let rows = std::io::Cursor::new(
4556            br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4557{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4558"#,
4559        );
4560
4561        let mut out = Vec::new();
4562        engine
4563            .run_ndjson(rows, "$.store.attributes.first().value", &mut out)
4564            .expect("nested byte array demand should run");
4565        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"a\"\n\"c\"\n");
4566
4567        out.clear();
4568        let rows = std::io::Cursor::new(
4569            br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4570{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4571"#,
4572        );
4573        engine
4574            .run_ndjson(rows, "$.store.attributes.last().value", &mut out)
4575            .expect("nested byte last demand should run from field prefix");
4576        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"d\"\n");
4577    }
4578
4579    #[test]
4580    fn run_ndjson_static_projection_survives_hint_activation() {
4581        let engine = crate::JetroEngine::new();
4582        let rows = std::io::Cursor::new(
4583            br#"{"id":1,"name":"a","active":true}
4584{"id":2,"name":"b","active":true}
4585{"id":3,"name":"c","active":true}
4586{"id":4,"name":"d","active":true}
4587{"id":5,"name":"e","active":true}
4588{"id":6,"name":"f","active":true}
4589{"id":7,"name":"g","active":true}
4590{"id":8,"name":"h","active":true}
4591{"id":9,"name":"i","active":true}
4592"#,
4593        );
4594        let mut out = Vec::new();
4595        engine
4596            .run_ndjson(rows, r#"{id: $.id, name: $.name}"#, &mut out)
4597            .expect("hinted static projection should run");
4598        assert_eq!(
4599            std::str::from_utf8(&out).unwrap(),
4600            "{\"id\":1,\"name\":\"a\"}\n\
4601{\"id\":2,\"name\":\"b\"}\n\
4602{\"id\":3,\"name\":\"c\"}\n\
4603{\"id\":4,\"name\":\"d\"}\n\
4604{\"id\":5,\"name\":\"e\"}\n\
4605{\"id\":6,\"name\":\"f\"}\n\
4606{\"id\":7,\"name\":\"g\"}\n\
4607{\"id\":8,\"name\":\"h\"}\n\
4608{\"id\":9,\"name\":\"i\"}\n"
4609        );
4610    }
4611
4612    #[test]
4613    fn run_ndjson_nested_projection_survives_hint_activation() {
4614        let engine = crate::JetroEngine::new();
4615        let rows = std::io::Cursor::new(
4616            br#"{"id":1,"profile":{"name":"a","score":10},"active":true}
4617{"id":2,"profile":{"name":"b","score":20},"active":true}
4618{"id":3,"profile":{"name":"c","score":30},"active":true}
4619{"id":4,"profile":{"name":"d","score":40},"active":true}
4620{"id":5,"profile":{"name":"e","score":50},"active":true}
4621{"id":6,"profile":{"name":"f","score":60},"active":true}
4622{"id":7,"profile":{"name":"g","score":70},"active":true}
4623{"id":8,"profile":{"name":"h","score":80},"active":true}
4624{"id":9,"profile":{"name":"i","score":90},"active":true}
4625"#,
4626        );
4627        let mut out = Vec::new();
4628        engine
4629            .run_ndjson(
4630                rows,
4631                r#"{id: $.id, name: $.profile.name, profile: $.profile}"#,
4632                &mut out,
4633            )
4634            .expect("hinted nested projection should run");
4635        assert_eq!(
4636            std::str::from_utf8(&out).unwrap(),
4637            "{\"id\":1,\"name\":\"a\",\"profile\":{\"name\":\"a\",\"score\":10}}\n\
4638{\"id\":2,\"name\":\"b\",\"profile\":{\"name\":\"b\",\"score\":20}}\n\
4639{\"id\":3,\"name\":\"c\",\"profile\":{\"name\":\"c\",\"score\":30}}\n\
4640{\"id\":4,\"name\":\"d\",\"profile\":{\"name\":\"d\",\"score\":40}}\n\
4641{\"id\":5,\"name\":\"e\",\"profile\":{\"name\":\"e\",\"score\":50}}\n\
4642{\"id\":6,\"name\":\"f\",\"profile\":{\"name\":\"f\",\"score\":60}}\n\
4643{\"id\":7,\"name\":\"g\",\"profile\":{\"name\":\"g\",\"score\":70}}\n\
4644{\"id\":8,\"name\":\"h\",\"profile\":{\"name\":\"h\",\"score\":80}}\n\
4645{\"id\":9,\"name\":\"i\",\"profile\":{\"name\":\"i\",\"score\":90}}\n"
4646        );
4647    }
4648
4649    #[test]
4650    fn run_ndjson_scalar_projection_survives_hint_activation() {
4651        let engine = crate::JetroEngine::new();
4652        let rows = std::io::Cursor::new(
4653            br#"{"id":1,"profile":{"name":"a"}}
4654{"id":2,"profile":{"name":"b"}}
4655{"id":3,"profile":{"name":"c"}}
4656{"id":4,"profile":{"name":"d"}}
4657{"id":5,"profile":{"name":"e"}}
4658{"id":6,"profile":{"name":"f"}}
4659{"id":7,"profile":{"name":"g"}}
4660{"id":8,"profile":{"name":"h"}}
4661{"id":9,"profile":{"name":"i"}}
4662"#,
4663        );
4664        let mut out = Vec::new();
4665        engine
4666            .run_ndjson(
4667                rows,
4668                r#"{id: $.id, name: $.profile.name.upper()}"#,
4669                &mut out,
4670            )
4671            .expect("hinted scalar projection should run");
4672        assert_eq!(
4673            std::str::from_utf8(&out).unwrap(),
4674            "{\"id\":1,\"name\":\"A\"}\n\
4675{\"id\":2,\"name\":\"B\"}\n\
4676{\"id\":3,\"name\":\"C\"}\n\
4677{\"id\":4,\"name\":\"D\"}\n\
4678{\"id\":5,\"name\":\"E\"}\n\
4679{\"id\":6,\"name\":\"F\"}\n\
4680{\"id\":7,\"name\":\"G\"}\n\
4681{\"id\":8,\"name\":\"H\"}\n\
4682{\"id\":9,\"name\":\"I\"}\n"
4683        );
4684    }
4685
4686    #[test]
4687    fn run_ndjson_stream_collect_survives_hint_activation() {
4688        let engine = crate::JetroEngine::new();
4689        let rows = std::io::Cursor::new(
4690            br#"{"id":1,"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4691{"id":2,"attributes":[{"key":"c","value":"z"}]}
4692{"id":3,"attributes":[{"key":"d","value":"w"}]}
4693"#,
4694        );
4695        let mut out = Vec::new();
4696        engine
4697            .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4698            .expect("hinted stream collect should run");
4699        assert_eq!(
4700            std::str::from_utf8(&out).unwrap(),
4701            "[[\"a\",\"x\"],[\"b\",\"y\"]]\n[[\"c\",\"z\"]]\n[[\"d\",\"w\"]]\n"
4702        );
4703    }
4704
4705    #[test]
4706    fn run_ndjson_stream_cache_rejects_reordered_item_prefixes() {
4707        let engine = crate::JetroEngine::new();
4708        let rows = std::io::Cursor::new(
4709            br#"{"attributes":[{"key":"k1","value":"a"}]}
4710{"attributes":[{"value":"k1","key":"actual"}]}
4711"#,
4712        );
4713        let mut out = Vec::new();
4714        engine
4715            .run_ndjson(rows, "$.attributes.map(@.key)", &mut out)
4716            .expect("stream cache should fall back on reordered item fields");
4717        assert_eq!(
4718            std::str::from_utf8(&out).unwrap(),
4719            "[\"k1\"]\n[\"actual\"]\n"
4720        );
4721    }
4722
4723    #[test]
4724    fn run_ndjson_stream_map_preserves_missing_field_nulls() {
4725        let engine = crate::JetroEngine::new();
4726        let rows = std::io::Cursor::new(
4727            br#"{"attributes":[{"key":"a","value":"x"},{"key":"b"}]}
4728{"attributes":[{"value":"z"}]}
4729"#,
4730        );
4731        let mut out = Vec::new();
4732        engine
4733            .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4734            .expect("stream map should preserve nulls for missing fields");
4735        assert_eq!(
4736            std::str::from_utf8(&out).unwrap(),
4737            "[[\"a\",\"x\"],[\"b\",null]]\n[[null,\"z\"]]\n"
4738        );
4739    }
4740
4741    #[test]
4742    fn run_ndjson_stream_object_map_preserves_scalar_calls() {
4743        let engine = crate::JetroEngine::new();
4744        let rows = std::io::Cursor::new(
4745            br#"{"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4746"#,
4747        );
4748        let mut out = Vec::new();
4749        engine
4750            .run_ndjson(
4751                rows,
4752                "$.attributes.map({k: @.key, v: @.value.upper()})",
4753                &mut out,
4754            )
4755            .expect("stream object map should preserve scalar calls");
4756        assert_eq!(
4757            std::str::from_utf8(&out).unwrap(),
4758            "[{\"k\":\"a\",\"v\":\"X\"},{\"k\":\"b\",\"v\":\"Y\"}]\n"
4759        );
4760    }
4761
4762    #[test]
4763    fn run_ndjson_stream_map_projects_nested_item_paths() {
4764        let engine = crate::JetroEngine::new();
4765        let rows = std::io::Cursor::new(
4766            br#"{"attributes":[{"key":"a","meta":{"code":"x"}},{"key":"b","meta":{"code":"y"}}]}
4767"#,
4768        );
4769        let mut out = Vec::new();
4770        engine
4771            .run_ndjson(
4772                rows,
4773                "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4774                &mut out,
4775            )
4776            .expect("stream map should project nested item paths");
4777        assert_eq!(
4778            std::str::from_utf8(&out).unwrap(),
4779            "[{\"k\":\"a\",\"code\":\"X\"},{\"k\":\"b\",\"code\":\"Y\"}]\n"
4780        );
4781    }
4782
4783    #[test]
4784    fn run_ndjson_stream_count_survives_hint_activation() {
4785        let engine = crate::JetroEngine::new();
4786        let rows = std::io::Cursor::new(
4787            br#"{"id":1,"attributes":[{"value":"x_3"},{"value":"y"}]}
4788{"id":2,"attributes":[{"value":"z_3"},{"value":"w_3"}]}
4789{"id":3,"attributes":[{"value":"n"}]}
4790"#,
4791        );
4792        let mut out = Vec::new();
4793        engine
4794            .run_ndjson(
4795                rows,
4796                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4797                &mut out,
4798            )
4799            .expect("hinted stream count should run");
4800        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n0\n");
4801    }
4802
4803    #[test]
4804    fn run_ndjson_filtered_count_ignores_missing_predicate_fields() {
4805        let engine = crate::JetroEngine::new();
4806        let rows = std::io::Cursor::new(
4807            br#"{"attributes":[{"value":"x_3"},{"key":"missing"},{"value":"y"}]}
4808{"attributes":[{"key":"missing"}]}
4809"#,
4810        );
4811        let mut out = Vec::new();
4812        engine
4813            .run_ndjson(
4814                rows,
4815                r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4816                &mut out,
4817            )
4818            .expect("filtered count should ignore missing predicate fields");
4819        assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n0\n");
4820    }
4821
4822    #[test]
4823    fn run_ndjson_filter_last_returns_last_matching_output() {
4824        let engine = crate::JetroEngine::new();
4825        let rows = std::io::Cursor::new(
4826            br#"{"attributes":[{"key":"keep","value":"first"},{"key":"drop","value":"physical-last"}]}
4827{"attributes":[{"key":"drop","value":"first"},{"key":"keep","value":"semantic-last"},{"key":"drop","value":"physical-last"}]}
4828"#,
4829        );
4830        let mut out = Vec::new();
4831
4832        engine
4833            .run_ndjson(
4834                rows,
4835                r#"$.attributes.filter(@.key == "keep").last().value"#,
4836                &mut out,
4837            )
4838            .expect("filtered last should preserve semantic output order");
4839
4840        assert_eq!(
4841            std::str::from_utf8(&out).unwrap(),
4842            "\"first\"\n\"semantic-last\"\n"
4843        );
4844    }
4845
4846    #[test]
4847    fn run_ndjson_filter_map_first_stops_at_first_matching_output() {
4848        let engine = crate::JetroEngine::new();
4849        let rows = std::io::Cursor::new(
4850            br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4851{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4852{"attributes":[{"key":"a","value":"skip"}]}
4853"#,
4854        );
4855        let mut out = Vec::new();
4856
4857        engine
4858            .run_ndjson(
4859                rows,
4860                r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4861                &mut out,
4862            )
4863            .expect("filtered first should use direct stream first");
4864
4865        assert_eq!(
4866            std::str::from_utf8(&out).unwrap(),
4867            "{\"key\":\"a\",\"value\":\"x_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4868        );
4869    }
4870
4871    #[test]
4872    fn run_ndjson_map_first_projects_first_item_without_filter() {
4873        let engine = crate::JetroEngine::new();
4874        let rows = std::io::Cursor::new(
4875            br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"later"}]}
4876{"attributes":[]}
4877{"attributes":[{"key":"c","value":"only"}]}
4878"#,
4879        );
4880        let mut out = Vec::new();
4881
4882        engine
4883            .run_ndjson(rows, "$.attributes.map(@.value).first()", &mut out)
4884            .expect("unfiltered first should use direct stream first");
4885
4886        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"first\"\n\"only\"\n");
4887    }
4888
4889    #[test]
4890    fn run_ndjson_map_last_projects_last_item_without_filter() {
4891        let engine = crate::JetroEngine::new();
4892        let rows = std::io::Cursor::new(
4893            br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"last"}]}
4894{"attributes":[]}
4895{"attributes":[{"key":"c","value":"only"}]}
4896"#,
4897        );
4898        let mut out = Vec::new();
4899
4900        engine
4901            .run_ndjson(rows, "$.attributes.map(@.value).last()", &mut out)
4902            .expect("unfiltered last should use direct stream last");
4903
4904        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"last\"\n\"only\"\n");
4905    }
4906
4907    #[test]
4908    fn run_ndjson_filter_map_last_keeps_latest_matching_output() {
4909        let engine = crate::JetroEngine::new();
4910        let rows = std::io::Cursor::new(
4911            br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4912{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4913{"attributes":[{"key":"a","value":"skip"}]}
4914"#,
4915        );
4916        let mut out = Vec::new();
4917
4918        engine
4919            .run_ndjson(
4920                rows,
4921                r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).last()"#,
4922                &mut out,
4923            )
4924            .expect("filtered last should use direct stream last");
4925
4926        assert_eq!(
4927            std::str::from_utf8(&out).unwrap(),
4928            "{\"key\":\"b\",\"value\":\"later_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4929        );
4930    }
4931
4932    #[test]
4933    fn run_ndjson_stream_numeric_survives_hint_activation() {
4934        let engine = crate::JetroEngine::new();
4935        let rows = std::io::Cursor::new(
4936            br#"{"id":1,"attributes":[{"weight":1},{"weight":2}]}
4937{"id":2,"attributes":[{"weight":3.5},{"weight":4}]}
4938{"id":3,"attributes":[{"weight":"skip"}]}
4939"#,
4940        );
4941        let mut out = Vec::new();
4942        engine
4943            .run_ndjson(rows, "$.attributes.map(@.weight).sum()", &mut out)
4944            .expect("hinted numeric stream should run");
4945        assert_eq!(std::str::from_utf8(&out).unwrap(), "3\n7.5\n0\n");
4946    }
4947
4948    #[test]
4949    fn run_ndjson_filtered_stream_numeric_uses_shared_fields() {
4950        let engine = crate::JetroEngine::new();
4951        let rows = std::io::Cursor::new(
4952            br#"{"attributes":[{"value":"x_3","weight":1},{"value":"skip","weight":10},{"value":"y_3","weight":2.5}]}
4953{"attributes":[{"value":"skip","weight":4}]}
4954"#,
4955        );
4956        let mut out = Vec::new();
4957        engine
4958            .run_ndjson(
4959                rows,
4960                r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4961                &mut out,
4962            )
4963            .expect("filtered numeric stream should use byte path");
4964        assert_eq!(std::str::from_utf8(&out).unwrap(), "3.5\n0\n");
4965    }
4966
4967    #[test]
4968    fn run_ndjson_stream_extreme_projects_selected_item_field() {
4969        let engine = crate::JetroEngine::new();
4970        let rows = std::io::Cursor::new(
4971            br#"{"attributes":[{"key":"a","value":"m"},{"key":"b","value":"z"},{"key":"c","value":"n"}]}
4972{"attributes":[{"key":"x","value":"b"},{"key":"y","value":"a"}]}
4973"#,
4974        );
4975        let mut out = Vec::new();
4976        engine
4977            .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4978            .expect("stream extreme should project selected item field");
4979        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"x\"\n");
4980    }
4981
4982    #[test]
4983    fn run_ndjson_stream_extreme_handles_escaped_string_keys() {
4984        let engine = crate::JetroEngine::new();
4985        let rows = std::io::Cursor::new(
4986            br#"{"attributes":[{"key":"a","value":"v\"1"},{"key":"b","value":"v_9"}]}
4987"#,
4988        );
4989        let mut out = Vec::new();
4990        engine
4991            .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4992            .expect("escaped extrema keys should fall back safely");
4993        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n");
4994    }
4995
4996    #[test]
4997    fn run_ndjson_stream_extreme_handles_numeric_keys() {
4998        let engine = crate::JetroEngine::new();
4999        let rows = std::io::Cursor::new(
5000            br#"{"attributes":[{"key":"a","score":1},{"key":"b","score":10},{"key":"c","score":2}]}
5001{"attributes":[{"key":"x","score":-2},{"key":"y","score":-1.5}]}
5002"#,
5003        );
5004        let mut out = Vec::new();
5005        engine
5006            .run_ndjson(rows, "$.attributes.sort_by(@.score).last().key", &mut out)
5007            .expect("numeric extrema keys should use direct stream extrema");
5008        assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"y\"\n");
5009    }
5010
5011    #[test]
5012    fn run_ndjson_nested_direct_projection_writes_without_fallback() {
5013        let engine = crate::JetroEngine::new();
5014        let rows = std::io::Cursor::new(
5015            br#"{"id":1,"name":"ada","attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
5016{"id":2,"name":"bob","attributes":[{"key":"c","value":"z"}]}
5017"#,
5018        );
5019        let mut out = Vec::new();
5020        engine
5021            .run_ndjson(
5022                rows,
5023                r#"{id: $.id, name: $.name, count: $.attributes.len(), first: $.attributes.first().value, values: $.attributes.map(@.value)}"#,
5024                &mut out,
5025            )
5026            .expect("nested direct projection should run");
5027        assert_eq!(
5028            std::str::from_utf8(&out).unwrap(),
5029            "{\"id\":1,\"name\":\"ada\",\"count\":2,\"first\":\"x\",\"values\":[\"x\",\"y\"]}\n\
5030{\"id\":2,\"name\":\"bob\",\"count\":1,\"first\":\"z\",\"values\":[\"z\"]}\n"
5031        );
5032    }
5033}