1use super::ndjson_byte::{
2 eval_ndjson_byte_predicate_row, tape_plan_can_write_byte_row, write_ndjson_byte_plan_row,
3 write_ndjson_byte_tape_plan_row, write_ndjson_hinted_tape_plan_row, BytePlanWrite,
4};
5#[cfg(test)]
6pub(super) use super::ndjson_direct::{
7 direct_byte_plan, direct_writer_plan_kind, NdjsonDirectPlanKind,
8};
9pub(super) use super::ndjson_direct::{
10 direct_tape_plan, direct_tape_predicate, direct_writer_plans, NdjsonDirectBytePlan,
11 NdjsonDirectElement, NdjsonDirectItemPredicate, NdjsonDirectPredicate,
12 NdjsonDirectProjectionValue, NdjsonDirectStreamMap, NdjsonDirectStreamPlan,
13 NdjsonDirectStreamSink, NdjsonDirectTapePlan,
14};
15use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
16use super::ndjson_hint::{
17 NdjsonHintAccessPlan, NdjsonHintConfig, NdjsonHintDecision, NdjsonHintState,
18};
19pub(super) use super::ndjson_row::{collect_row_val, parse_row, row_eval_error, row_parse_error};
20use super::ndjson_rows::NdjsonRowsFilePlan;
21use super::ndjson_route::{
22 ndjson_route_plan, NdjsonExecutionReport, NdjsonExecutionStats, NdjsonRouteExplain,
23 NdjsonRoutePlan, NdjsonSourceCaps, NdjsonSourceMode,
24};
25use super::ndjson_stream_cache::NdjsonConstantStreamCache;
26pub(super) use super::ndjson_write::{
27 ndjson_writer_with_options, write_json_bytes_line_with_options, write_val_line,
28 write_val_line_with_options,
29};
30use super::stream_exec::CompiledRowStream;
31use super::stream_fanout::{
32 drive_ndjson_rows_fanout_file, drive_ndjson_rows_fanout_file_with_stats,
33};
34#[cfg(test)]
35use super::stream_plan::RowStreamSourceKind;
36use super::stream_plan::{RowStreamDirection, RowStreamPlan};
37use super::stream_subquery::{RowStreamSubqueryPlan, STREAM_BINDING};
38use super::stream_types::{RowStreamRowResult, RowStreamStats};
39use super::{NdjsonSource, RowError};
40pub use super::ndjson_driver::NdjsonPerRowDriver;
41use crate::compile::compiler::Compiler;
42use crate::data::context::Env;
43use crate::data::value::Val;
44use crate::plan::physical::PlanningContext;
45use crate::util::is_truthy;
46use crate::{EvalError, Jetro, JetroEngine, JetroEngineError, VM};
47use memchr::memchr;
48use serde_json::Value;
49use std::fs::File;
50use std::io::{BufRead, Write};
51use std::path::Path;
52use std::sync::MutexGuard;
53
54pub(super) const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
55const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
56pub(super) const DEFAULT_READER_BUFFER_CAPACITY: usize = 1024 * 1024;
57pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
58
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
60pub enum NdjsonWriterPathKind {
61 ByteExpr,
62 ByteWritableTape,
63 Tape,
64}
65
66impl std::fmt::Display for NdjsonWriterPathKind {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.write_str(match self {
69 Self::ByteExpr => "byte-expr",
70 Self::ByteWritableTape => "byte-writable-tape",
71 Self::Tape => "tape",
72 })
73 }
74}
75
76#[cfg(test)]
77pub(super) fn direct_writer_path_kind(
78 engine: &JetroEngine,
79 query: &str,
80) -> Option<NdjsonWriterPathKind> {
81 ndjson_writer_path_kind(engine, query)
82}
83
84pub fn ndjson_writer_path_kind(
85 engine: &JetroEngine,
86 query: &str,
87) -> Option<NdjsonWriterPathKind> {
88 let (byte, tape) = direct_writer_plans(engine, query)?;
89 if byte.is_some() {
90 return Some(NdjsonWriterPathKind::ByteExpr);
91 }
92 if tape_plan_can_write_byte_row(&tape) {
93 return Some(NdjsonWriterPathKind::ByteWritableTape);
94 }
95 Some(NdjsonWriterPathKind::Tape)
96}
97
98#[derive(Clone, Copy, Debug, PartialEq, Eq)]
100pub struct NdjsonOptions {
101 pub max_line_len: usize,
102 pub initial_buffer_capacity: usize,
103 pub reader_buffer_capacity: usize,
104 pub reverse_chunk_size: usize,
105 pub parallel_min_bytes: u64,
106 pub parallelism: NdjsonParallelism,
107 pub row_frame: NdjsonRowFrame,
108 pub null_output: NdjsonNullOutput,
109}
110
111#[derive(Clone, Copy, Debug, Eq, PartialEq)]
112pub enum NdjsonParallelism {
113 Auto,
114 Off,
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq)]
118pub enum NdjsonNullOutput {
119 Skip,
120 Emit,
121}
122
123impl Default for NdjsonOptions {
124 fn default() -> Self {
125 Self {
126 max_line_len: DEFAULT_MAX_LINE_LEN,
127 initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
128 reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
129 reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
130 parallel_min_bytes: 64 * 1024 * 1024,
131 parallelism: NdjsonParallelism::Auto,
132 row_frame: NdjsonRowFrame::JsonLine,
133 null_output: NdjsonNullOutput::Skip,
134 }
135 }
136}
137
138impl NdjsonOptions {
139 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
140 self.max_line_len = max_line_len;
141 self
142 }
143
144 pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
145 self.initial_buffer_capacity = capacity;
146 self
147 }
148
149 pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
150 self.reader_buffer_capacity = capacity;
151 self
152 }
153
154 pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
155 self.reverse_chunk_size = capacity;
156 self
157 }
158
159 pub fn with_parallel_min_bytes(mut self, bytes: u64) -> Self {
160 self.parallel_min_bytes = bytes;
161 self
162 }
163
164 pub fn with_parallelism(mut self, parallelism: NdjsonParallelism) -> Self {
165 self.parallelism = parallelism;
166 self
167 }
168
169 pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
170 self.row_frame = row_frame;
171 self
172 }
173
174 pub fn with_null_output(mut self, null_output: NdjsonNullOutput) -> Self {
175 self.null_output = null_output;
176 self
177 }
178}
179
180#[derive(Clone, Copy, Debug, Eq, PartialEq)]
181pub enum NdjsonControl {
182 Continue,
183 Stop,
184}
185
186pub fn for_each_ndjson<R, F>(
187 engine: &JetroEngine,
188 reader: R,
189 query: &str,
190 f: F,
191) -> Result<usize, JetroEngineError>
192where
193 R: BufRead,
194 F: FnMut(Value),
195{
196 for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
197}
198
199pub fn for_each_ndjson_with_options<R, F>(
200 engine: &JetroEngine,
201 reader: R,
202 query: &str,
203 options: NdjsonOptions,
204 mut f: F,
205) -> Result<usize, JetroEngineError>
206where
207 R: BufRead,
208 F: FnMut(Value),
209{
210 drive_ndjson(engine, reader, query, options, |value| {
211 f(value);
212 Ok(NdjsonControl::Continue)
213 })
214}
215
216pub fn for_each_ndjson_until<R, F>(
217 engine: &JetroEngine,
218 reader: R,
219 query: &str,
220 f: F,
221) -> Result<usize, JetroEngineError>
222where
223 R: BufRead,
224 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
225{
226 for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
227}
228
229pub fn for_each_ndjson_until_with_options<R, F>(
230 engine: &JetroEngine,
231 reader: R,
232 query: &str,
233 options: NdjsonOptions,
234 f: F,
235) -> Result<usize, JetroEngineError>
236where
237 R: BufRead,
238 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
239{
240 drive_ndjson(engine, reader, query, options, f)
241}
242
243pub fn for_each_ndjson_source<F>(
244 engine: &JetroEngine,
245 source: NdjsonSource,
246 query: &str,
247 f: F,
248) -> Result<usize, JetroEngineError>
249where
250 F: FnMut(Value),
251{
252 for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
253}
254
255pub fn for_each_ndjson_source_with_options<F>(
256 engine: &JetroEngine,
257 source: NdjsonSource,
258 query: &str,
259 options: NdjsonOptions,
260 f: F,
261) -> Result<usize, JetroEngineError>
262where
263 F: FnMut(Value),
264{
265 match source {
266 NdjsonSource::File(path) => {
267 let file = File::open(path)?;
268 for_each_ndjson_with_options(
269 engine,
270 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
271 query,
272 options,
273 f,
274 )
275 }
276 NdjsonSource::Reader(reader) => {
277 for_each_ndjson_with_options(engine, reader, query, options, f)
278 }
279 }
280}
281
282pub fn for_each_ndjson_source_until<F>(
283 engine: &JetroEngine,
284 source: NdjsonSource,
285 query: &str,
286 f: F,
287) -> Result<usize, JetroEngineError>
288where
289 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
290{
291 for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
292}
293
294pub fn for_each_ndjson_source_until_with_options<F>(
295 engine: &JetroEngine,
296 source: NdjsonSource,
297 query: &str,
298 options: NdjsonOptions,
299 f: F,
300) -> Result<usize, JetroEngineError>
301where
302 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
303{
304 match source {
305 NdjsonSource::File(path) => {
306 let file = File::open(path)?;
307 for_each_ndjson_until_with_options(
308 engine,
309 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
310 query,
311 options,
312 f,
313 )
314 }
315 NdjsonSource::Reader(reader) => {
316 for_each_ndjson_until_with_options(engine, reader, query, options, f)
317 }
318 }
319}
320
321pub fn collect_ndjson<R>(
322 engine: &JetroEngine,
323 reader: R,
324 query: &str,
325) -> Result<Vec<Value>, JetroEngineError>
326where
327 R: BufRead,
328{
329 collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
330}
331
332pub fn collect_ndjson_with_options<R>(
333 engine: &JetroEngine,
334 reader: R,
335 query: &str,
336 options: NdjsonOptions,
337) -> Result<Vec<Value>, JetroEngineError>
338where
339 R: BufRead,
340{
341 let mut values = Vec::new();
342 for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
343 Ok(values)
344}
345
346pub fn collect_ndjson_file<P>(
347 engine: &JetroEngine,
348 path: P,
349 query: &str,
350) -> Result<Vec<Value>, JetroEngineError>
351where
352 P: AsRef<Path>,
353{
354 let file = File::open(path)?;
355 let options = NdjsonOptions::default();
356 collect_ndjson_with_options(
357 engine,
358 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
359 query,
360 options,
361 )
362}
363
364pub fn collect_ndjson_file_with_options<P>(
365 engine: &JetroEngine,
366 path: P,
367 query: &str,
368 options: NdjsonOptions,
369) -> Result<Vec<Value>, JetroEngineError>
370where
371 P: AsRef<Path>,
372{
373 let file = File::open(path)?;
374 collect_ndjson_with_options(
375 engine,
376 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
377 query,
378 options,
379 )
380}
381
382pub fn collect_ndjson_source(
383 engine: &JetroEngine,
384 source: NdjsonSource,
385 query: &str,
386) -> Result<Vec<Value>, JetroEngineError> {
387 collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
388}
389
390pub fn collect_ndjson_source_with_options(
391 engine: &JetroEngine,
392 source: NdjsonSource,
393 query: &str,
394 options: NdjsonOptions,
395) -> Result<Vec<Value>, JetroEngineError> {
396 match source {
397 NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
398 NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
399 }
400}
401
402pub fn collect_ndjson_matches<R>(
403 engine: &JetroEngine,
404 reader: R,
405 predicate: &str,
406 limit: usize,
407) -> Result<Vec<Value>, JetroEngineError>
408where
409 R: BufRead,
410{
411 collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
412}
413
414pub fn collect_ndjson_matches_with_options<R>(
415 engine: &JetroEngine,
416 reader: R,
417 predicate: &str,
418 limit: usize,
419 options: NdjsonOptions,
420) -> Result<Vec<Value>, JetroEngineError>
421where
422 R: BufRead,
423{
424 let mut values = Vec::with_capacity(limit);
425 drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
426 values.push(Value::from(value));
427 Ok(NdjsonControl::Continue)
428 })?;
429 Ok(values)
430}
431
432pub fn collect_ndjson_matches_file<P>(
433 engine: &JetroEngine,
434 path: P,
435 predicate: &str,
436 limit: usize,
437) -> Result<Vec<Value>, JetroEngineError>
438where
439 P: AsRef<Path>,
440{
441 let file = File::open(path)?;
442 let options = NdjsonOptions::default();
443 collect_ndjson_matches_with_options(
444 engine,
445 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
446 predicate,
447 limit,
448 options,
449 )
450}
451
452pub fn collect_ndjson_matches_file_with_options<P>(
453 engine: &JetroEngine,
454 path: P,
455 predicate: &str,
456 limit: usize,
457 options: NdjsonOptions,
458) -> Result<Vec<Value>, JetroEngineError>
459where
460 P: AsRef<Path>,
461{
462 let file = File::open(path)?;
463 collect_ndjson_matches_with_options(
464 engine,
465 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
466 predicate,
467 limit,
468 options,
469 )
470}
471
472pub fn collect_ndjson_matches_source(
473 engine: &JetroEngine,
474 source: NdjsonSource,
475 predicate: &str,
476 limit: usize,
477) -> Result<Vec<Value>, JetroEngineError> {
478 collect_ndjson_matches_source_with_options(
479 engine,
480 source,
481 predicate,
482 limit,
483 NdjsonOptions::default(),
484 )
485}
486
487pub fn collect_ndjson_matches_source_with_options(
488 engine: &JetroEngine,
489 source: NdjsonSource,
490 predicate: &str,
491 limit: usize,
492 options: NdjsonOptions,
493) -> Result<Vec<Value>, JetroEngineError> {
494 match source {
495 NdjsonSource::File(path) => {
496 collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
497 }
498 NdjsonSource::Reader(reader) => {
499 collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
500 }
501 }
502}
503
504pub fn run_ndjson<R, W>(
505 engine: &JetroEngine,
506 reader: R,
507 query: &str,
508 writer: W,
509) -> Result<usize, JetroEngineError>
510where
511 R: BufRead,
512 W: Write,
513{
514 run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
515}
516
517pub fn run_ndjson_file<P, W>(
518 engine: &JetroEngine,
519 path: P,
520 query: &str,
521 writer: W,
522) -> Result<usize, JetroEngineError>
523where
524 P: AsRef<Path>,
525 W: Write,
526{
527 let options = NdjsonOptions::default();
528 run_ndjson_file_with_options(engine, path, query, writer, options)
529}
530
531pub fn run_ndjson_file_with_options<P, W>(
532 engine: &JetroEngine,
533 path: P,
534 query: &str,
535 writer: W,
536 options: NdjsonOptions,
537) -> Result<usize, JetroEngineError>
538where
539 P: AsRef<Path>,
540 W: Write,
541{
542 let path = path.as_ref();
543 match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
544 NdjsonRoutePlan::Rows { plan, .. } => {
545 return drive_ndjson_rows_file_plan(engine, path, &plan, None, options, writer);
546 }
547 NdjsonRoutePlan::Unsupported { explain } => {
548 return Err(unsupported_ndjson_route_error(&explain));
549 }
550 NdjsonRoutePlan::RowLocal { .. } => {}
551 }
552
553 let file = File::open(path)?;
554 run_ndjson_with_options(
555 engine,
556 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
557 query,
558 writer,
559 options,
560 )
561}
562
563pub fn run_ndjson_file_with_report<P, W>(
564 engine: &JetroEngine,
565 path: P,
566 query: &str,
567 writer: W,
568) -> Result<NdjsonExecutionReport, JetroEngineError>
569where
570 P: AsRef<Path>,
571 W: Write,
572{
573 run_ndjson_file_with_report_and_options(engine, path, query, writer, NdjsonOptions::default())
574}
575
576pub fn run_ndjson_file_with_report_and_options<P, W>(
577 engine: &JetroEngine,
578 path: P,
579 query: &str,
580 writer: W,
581 options: NdjsonOptions,
582) -> Result<NdjsonExecutionReport, JetroEngineError>
583where
584 P: AsRef<Path>,
585 W: Write,
586{
587 let path = path.as_ref();
588 match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
589 NdjsonRoutePlan::Rows {
590 explain,
591 plan: NdjsonRowsFilePlan::Stream(plan),
592 } => {
593 let (_, stats) =
594 drive_ndjson_rows_stream_file_with_stats(engine, path, &plan, None, options, writer)?;
595 Ok(row_stream_report(explain, stats))
596 }
597 NdjsonRoutePlan::Rows {
598 explain,
599 plan: NdjsonRowsFilePlan::Fanout(plan),
600 } => {
601 let (_, stats) =
602 drive_ndjson_rows_fanout_file_with_stats(engine, path, &plan, options, writer)?;
603 Ok(row_stream_report(explain, stats))
604 }
605 NdjsonRoutePlan::Rows {
606 explain,
607 plan: NdjsonRowsFilePlan::Subquery(plan),
608 } => {
609 let (_, stats) =
610 drive_ndjson_rows_subquery_file_with_stats(engine, path, &plan, options, writer)?;
611 Ok(row_stream_report(explain, stats))
612 }
613 NdjsonRoutePlan::Unsupported { explain } => Err(unsupported_ndjson_route_error(&explain)),
614 NdjsonRoutePlan::RowLocal { explain } => {
615 let file = File::open(path)?;
616 let (_, stats) = drive_ndjson_writer_with_stats(
617 engine,
618 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
619 query,
620 None,
621 options,
622 writer,
623 )?;
624 Ok(NdjsonExecutionReport::new(explain, stats))
625 }
626 }
627}
628
629pub fn run_ndjson_with_options<R, W>(
630 engine: &JetroEngine,
631 reader: R,
632 query: &str,
633 writer: W,
634 options: NdjsonOptions,
635) -> Result<usize, JetroEngineError>
636where
637 R: BufRead,
638 W: Write,
639{
640 match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
641 NdjsonRoutePlan::Rows {
642 plan: NdjsonRowsFilePlan::Stream(plan),
643 ..
644 } => drive_ndjson_rows_stream_reader(engine, reader, &plan, None, options, writer),
645 NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
646 Err(unsupported_ndjson_route_error(&explain))
647 }
648 NdjsonRoutePlan::RowLocal { .. } => {
649 drive_ndjson_writer(engine, reader, query, None, options, writer)
650 }
651 }
652}
653
654pub fn run_ndjson_with_report<R, W>(
655 engine: &JetroEngine,
656 reader: R,
657 query: &str,
658 writer: W,
659) -> Result<NdjsonExecutionReport, JetroEngineError>
660where
661 R: BufRead,
662 W: Write,
663{
664 run_ndjson_with_report_and_options(engine, reader, query, writer, NdjsonOptions::default())
665}
666
667pub fn run_ndjson_with_report_and_options<R, W>(
668 engine: &JetroEngine,
669 reader: R,
670 query: &str,
671 writer: W,
672 options: NdjsonOptions,
673) -> Result<NdjsonExecutionReport, JetroEngineError>
674where
675 R: BufRead,
676 W: Write,
677{
678 match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
679 NdjsonRoutePlan::Rows {
680 explain,
681 plan: NdjsonRowsFilePlan::Stream(plan),
682 } => {
683 let (_, stats) =
684 drive_ndjson_rows_stream_reader_with_stats(engine, reader, &plan, None, options, writer)?;
685 Ok(row_stream_report(explain, stats))
686 }
687 NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
688 Err(unsupported_ndjson_route_error(&explain))
689 }
690 NdjsonRoutePlan::RowLocal { explain } => {
691 let (_, stats) =
692 drive_ndjson_writer_with_stats(engine, reader, query, None, options, writer)?;
693 Ok(NdjsonExecutionReport::new(explain, stats))
694 }
695 }
696}
697
698pub fn run_ndjson_limit<R, W>(
699 engine: &JetroEngine,
700 reader: R,
701 query: &str,
702 limit: usize,
703 writer: W,
704) -> Result<usize, JetroEngineError>
705where
706 R: BufRead,
707 W: Write,
708{
709 run_ndjson_limit_with_options(
710 engine,
711 reader,
712 query,
713 limit,
714 writer,
715 NdjsonOptions::default(),
716 )
717}
718
719pub fn run_ndjson_limit_with_options<R, W>(
720 engine: &JetroEngine,
721 reader: R,
722 query: &str,
723 limit: usize,
724 writer: W,
725 options: NdjsonOptions,
726) -> Result<usize, JetroEngineError>
727where
728 R: BufRead,
729 W: Write,
730{
731 if limit == 0 {
732 return Ok(0);
733 }
734
735 match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
736 NdjsonRoutePlan::Rows {
737 plan: NdjsonRowsFilePlan::Stream(plan),
738 ..
739 } => drive_ndjson_rows_stream_reader(engine, reader, &plan, Some(limit), options, writer),
740 NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
741 Err(unsupported_ndjson_route_error(&explain))
742 }
743 NdjsonRoutePlan::RowLocal { .. } => {
744 drive_ndjson_writer(engine, reader, query, Some(limit), options, writer)
745 }
746 }
747}
748
749pub fn run_ndjson_limit_with_report<R, W>(
750 engine: &JetroEngine,
751 reader: R,
752 query: &str,
753 limit: usize,
754 writer: W,
755) -> Result<NdjsonExecutionReport, JetroEngineError>
756where
757 R: BufRead,
758 W: Write,
759{
760 run_ndjson_limit_with_report_and_options(
761 engine,
762 reader,
763 query,
764 limit,
765 writer,
766 NdjsonOptions::default(),
767 )
768}
769
770pub fn run_ndjson_limit_with_report_and_options<R, W>(
771 engine: &JetroEngine,
772 reader: R,
773 query: &str,
774 limit: usize,
775 writer: W,
776 options: NdjsonOptions,
777) -> Result<NdjsonExecutionReport, JetroEngineError>
778where
779 R: BufRead,
780 W: Write,
781{
782 if limit == 0 {
783 let route = ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)?
784 .explain()
785 .clone();
786 return Ok(NdjsonExecutionReport::emitted_only(route, 0));
787 }
788
789 match ndjson_route_plan(engine, NdjsonSourceMode::Reader, query, options)? {
790 NdjsonRoutePlan::Rows {
791 explain,
792 plan: NdjsonRowsFilePlan::Stream(plan),
793 } => {
794 let (_, stats) = drive_ndjson_rows_stream_reader_with_stats(
795 engine,
796 reader,
797 &plan,
798 Some(limit),
799 options,
800 writer,
801 )?;
802 Ok(row_stream_report(explain, stats))
803 }
804 NdjsonRoutePlan::Rows { explain, .. } | NdjsonRoutePlan::Unsupported { explain } => {
805 Err(unsupported_ndjson_route_error(&explain))
806 }
807 NdjsonRoutePlan::RowLocal { explain } => {
808 let (_, stats) = drive_ndjson_writer_with_stats(
809 engine,
810 reader,
811 query,
812 Some(limit),
813 options,
814 writer,
815 )?;
816 Ok(NdjsonExecutionReport::new(explain, stats))
817 }
818 }
819}
820
821pub fn run_ndjson_file_limit<P, W>(
822 engine: &JetroEngine,
823 path: P,
824 query: &str,
825 limit: usize,
826 writer: W,
827) -> Result<usize, JetroEngineError>
828where
829 P: AsRef<Path>,
830 W: Write,
831{
832 let options = NdjsonOptions::default();
833 run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
834}
835
836pub fn run_ndjson_file_limit_with_options<P, W>(
837 engine: &JetroEngine,
838 path: P,
839 query: &str,
840 limit: usize,
841 writer: W,
842 options: NdjsonOptions,
843) -> Result<usize, JetroEngineError>
844where
845 P: AsRef<Path>,
846 W: Write,
847{
848 if limit == 0 {
849 return Ok(0);
850 }
851 let path = path.as_ref();
852 match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
853 NdjsonRoutePlan::Rows { plan, .. } => {
854 return drive_ndjson_rows_file_plan(engine, path, &plan, Some(limit), options, writer);
855 }
856 NdjsonRoutePlan::Unsupported { explain } => {
857 return Err(unsupported_ndjson_route_error(&explain));
858 }
859 NdjsonRoutePlan::RowLocal { .. } => {}
860 }
861
862 let file = File::open(path)?;
863 run_ndjson_limit_with_options(
864 engine,
865 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
866 query,
867 limit,
868 writer,
869 options,
870 )
871}
872
873pub fn run_ndjson_file_limit_with_report<P, W>(
874 engine: &JetroEngine,
875 path: P,
876 query: &str,
877 limit: usize,
878 writer: W,
879) -> Result<NdjsonExecutionReport, JetroEngineError>
880where
881 P: AsRef<Path>,
882 W: Write,
883{
884 run_ndjson_file_limit_with_report_and_options(
885 engine,
886 path,
887 query,
888 limit,
889 writer,
890 NdjsonOptions::default(),
891 )
892}
893
894pub fn run_ndjson_file_limit_with_report_and_options<P, W>(
895 engine: &JetroEngine,
896 path: P,
897 query: &str,
898 limit: usize,
899 writer: W,
900 options: NdjsonOptions,
901) -> Result<NdjsonExecutionReport, JetroEngineError>
902where
903 P: AsRef<Path>,
904 W: Write,
905{
906 if limit == 0 {
907 let route = ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)?
908 .explain()
909 .clone();
910 return Ok(NdjsonExecutionReport::emitted_only(route, 0));
911 }
912
913 let path = path.as_ref();
914 match ndjson_route_plan(engine, NdjsonSourceMode::File, query, options)? {
915 NdjsonRoutePlan::Rows {
916 explain,
917 plan: NdjsonRowsFilePlan::Stream(plan),
918 } => {
919 let (_, stats) = drive_ndjson_rows_stream_file_with_stats(
920 engine,
921 path,
922 &plan,
923 Some(limit),
924 options,
925 writer,
926 )?;
927 Ok(row_stream_report(explain, stats))
928 }
929 NdjsonRoutePlan::Rows {
930 explain,
931 plan: NdjsonRowsFilePlan::Fanout(plan),
932 } => {
933 let (_, stats) =
934 drive_ndjson_rows_fanout_file_with_stats(engine, path, &plan, options, writer)?;
935 Ok(row_stream_report(explain, stats))
936 }
937 NdjsonRoutePlan::Rows {
938 explain,
939 plan: NdjsonRowsFilePlan::Subquery(plan),
940 } => {
941 let (_, stats) =
942 drive_ndjson_rows_subquery_file_with_stats(engine, path, &plan, options, writer)?;
943 Ok(row_stream_report(explain, stats))
944 }
945 NdjsonRoutePlan::Unsupported { explain } => Err(unsupported_ndjson_route_error(&explain)),
946 NdjsonRoutePlan::RowLocal { explain } => {
947 let file = File::open(path)?;
948 let (_, stats) = drive_ndjson_writer_with_stats(
949 engine,
950 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
951 query,
952 Some(limit),
953 options,
954 writer,
955 )?;
956 Ok(NdjsonExecutionReport::new(explain, stats))
957 }
958 }
959}
960
961fn unsupported_ndjson_route_error(explain: &NdjsonRouteExplain) -> JetroEngineError {
962 let message = explain
963 .unsupported_message()
964 .unwrap_or_else(|| "unsupported NDJSON route".to_string());
965 JetroEngineError::Eval(EvalError(message))
966}
967
968fn row_stream_report(explain: NdjsonRouteExplain, stats: RowStreamStats) -> NdjsonExecutionReport {
969 NdjsonExecutionReport::new(explain, NdjsonExecutionStats::from(&stats))
970}
971
972pub fn run_ndjson_source<W>(
973 engine: &JetroEngine,
974 source: NdjsonSource,
975 query: &str,
976 writer: W,
977) -> Result<usize, JetroEngineError>
978where
979 W: Write,
980{
981 run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
982}
983
984pub fn run_ndjson_source_with_options<W>(
985 engine: &JetroEngine,
986 source: NdjsonSource,
987 query: &str,
988 writer: W,
989 options: NdjsonOptions,
990) -> Result<usize, JetroEngineError>
991where
992 W: Write,
993{
994 match source {
995 NdjsonSource::File(path) => {
996 run_ndjson_file_with_options(engine, path, query, writer, options)
997 }
998 NdjsonSource::Reader(reader) => {
999 run_ndjson_with_options(engine, reader, query, writer, options)
1000 }
1001 }
1002}
1003
1004pub fn run_ndjson_source_with_report<W>(
1005 engine: &JetroEngine,
1006 source: NdjsonSource,
1007 query: &str,
1008 writer: W,
1009) -> Result<NdjsonExecutionReport, JetroEngineError>
1010where
1011 W: Write,
1012{
1013 run_ndjson_source_with_report_and_options(engine, source, query, writer, NdjsonOptions::default())
1014}
1015
1016pub fn run_ndjson_source_with_report_and_options<W>(
1017 engine: &JetroEngine,
1018 source: NdjsonSource,
1019 query: &str,
1020 writer: W,
1021 options: NdjsonOptions,
1022) -> Result<NdjsonExecutionReport, JetroEngineError>
1023where
1024 W: Write,
1025{
1026 match source {
1027 NdjsonSource::File(path) => {
1028 run_ndjson_file_with_report_and_options(engine, path, query, writer, options)
1029 }
1030 NdjsonSource::Reader(reader) => {
1031 run_ndjson_with_report_and_options(engine, reader, query, writer, options)
1032 }
1033 }
1034}
1035
1036pub fn run_ndjson_source_limit<W>(
1037 engine: &JetroEngine,
1038 source: NdjsonSource,
1039 query: &str,
1040 limit: usize,
1041 writer: W,
1042) -> Result<usize, JetroEngineError>
1043where
1044 W: Write,
1045{
1046 run_ndjson_source_limit_with_options(
1047 engine,
1048 source,
1049 query,
1050 limit,
1051 writer,
1052 NdjsonOptions::default(),
1053 )
1054}
1055
1056pub fn run_ndjson_source_limit_with_options<W>(
1057 engine: &JetroEngine,
1058 source: NdjsonSource,
1059 query: &str,
1060 limit: usize,
1061 writer: W,
1062 options: NdjsonOptions,
1063) -> Result<usize, JetroEngineError>
1064where
1065 W: Write,
1066{
1067 match source {
1068 NdjsonSource::File(path) => {
1069 run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
1070 }
1071 NdjsonSource::Reader(reader) => {
1072 run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
1073 }
1074 }
1075}
1076
1077pub fn run_ndjson_source_limit_with_report<W>(
1078 engine: &JetroEngine,
1079 source: NdjsonSource,
1080 query: &str,
1081 limit: usize,
1082 writer: W,
1083) -> Result<NdjsonExecutionReport, JetroEngineError>
1084where
1085 W: Write,
1086{
1087 run_ndjson_source_limit_with_report_and_options(
1088 engine,
1089 source,
1090 query,
1091 limit,
1092 writer,
1093 NdjsonOptions::default(),
1094 )
1095}
1096
1097pub fn run_ndjson_source_limit_with_report_and_options<W>(
1098 engine: &JetroEngine,
1099 source: NdjsonSource,
1100 query: &str,
1101 limit: usize,
1102 writer: W,
1103 options: NdjsonOptions,
1104) -> Result<NdjsonExecutionReport, JetroEngineError>
1105where
1106 W: Write,
1107{
1108 match source {
1109 NdjsonSource::File(path) => {
1110 run_ndjson_file_limit_with_report_and_options(engine, path, query, limit, writer, options)
1111 }
1112 NdjsonSource::Reader(reader) => {
1113 run_ndjson_limit_with_report_and_options(engine, reader, query, limit, writer, options)
1114 }
1115 }
1116}
1117
1118pub fn run_ndjson_matches<R, W>(
1119 engine: &JetroEngine,
1120 reader: R,
1121 predicate: &str,
1122 limit: usize,
1123 writer: W,
1124) -> Result<usize, JetroEngineError>
1125where
1126 R: BufRead,
1127 W: Write,
1128{
1129 run_ndjson_matches_with_options(
1130 engine,
1131 reader,
1132 predicate,
1133 limit,
1134 writer,
1135 NdjsonOptions::default(),
1136 )
1137}
1138
1139pub fn run_ndjson_matches_with_options<R, W>(
1140 engine: &JetroEngine,
1141 reader: R,
1142 predicate: &str,
1143 limit: usize,
1144 writer: W,
1145 options: NdjsonOptions,
1146) -> Result<usize, JetroEngineError>
1147where
1148 R: BufRead,
1149 W: Write,
1150{
1151 drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
1152}
1153
1154pub fn run_ndjson_matches_with_report<R, W>(
1155 engine: &JetroEngine,
1156 reader: R,
1157 predicate: &str,
1158 limit: usize,
1159 writer: W,
1160) -> Result<NdjsonExecutionReport, JetroEngineError>
1161where
1162 R: BufRead,
1163 W: Write,
1164{
1165 run_ndjson_matches_with_report_and_options(
1166 engine,
1167 reader,
1168 predicate,
1169 limit,
1170 writer,
1171 NdjsonOptions::default(),
1172 )
1173}
1174
1175pub fn run_ndjson_matches_with_report_and_options<R, W>(
1176 engine: &JetroEngine,
1177 reader: R,
1178 predicate: &str,
1179 limit: usize,
1180 writer: W,
1181 options: NdjsonOptions,
1182) -> Result<NdjsonExecutionReport, JetroEngineError>
1183where
1184 R: BufRead,
1185 W: Write,
1186{
1187 let (_, stats) =
1188 drive_ndjson_matches_writer_with_stats(engine, reader, predicate, limit, options, writer)?;
1189 Ok(NdjsonExecutionReport::new(
1190 NdjsonRouteExplain::matches(NdjsonSourceCaps::reader(options)),
1191 stats,
1192 ))
1193}
1194
1195pub fn run_ndjson_matches_file<P, W>(
1196 engine: &JetroEngine,
1197 path: P,
1198 predicate: &str,
1199 limit: usize,
1200 writer: W,
1201) -> Result<usize, JetroEngineError>
1202where
1203 P: AsRef<Path>,
1204 W: Write,
1205{
1206 let file = File::open(path)?;
1207 let options = NdjsonOptions::default();
1208 run_ndjson_matches_with_options(
1209 engine,
1210 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1211 predicate,
1212 limit,
1213 writer,
1214 options,
1215 )
1216}
1217
1218pub fn run_ndjson_matches_file_with_options<P, W>(
1219 engine: &JetroEngine,
1220 path: P,
1221 predicate: &str,
1222 limit: usize,
1223 writer: W,
1224 options: NdjsonOptions,
1225) -> Result<usize, JetroEngineError>
1226where
1227 P: AsRef<Path>,
1228 W: Write,
1229{
1230 let file = File::open(path)?;
1231 run_ndjson_matches_with_options(
1232 engine,
1233 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1234 predicate,
1235 limit,
1236 writer,
1237 options,
1238 )
1239}
1240
1241pub fn run_ndjson_matches_file_with_report<P, W>(
1242 engine: &JetroEngine,
1243 path: P,
1244 predicate: &str,
1245 limit: usize,
1246 writer: W,
1247) -> Result<NdjsonExecutionReport, JetroEngineError>
1248where
1249 P: AsRef<Path>,
1250 W: Write,
1251{
1252 run_ndjson_matches_file_with_report_and_options(
1253 engine,
1254 path,
1255 predicate,
1256 limit,
1257 writer,
1258 NdjsonOptions::default(),
1259 )
1260}
1261
1262pub fn run_ndjson_matches_file_with_report_and_options<P, W>(
1263 engine: &JetroEngine,
1264 path: P,
1265 predicate: &str,
1266 limit: usize,
1267 writer: W,
1268 options: NdjsonOptions,
1269) -> Result<NdjsonExecutionReport, JetroEngineError>
1270where
1271 P: AsRef<Path>,
1272 W: Write,
1273{
1274 let file = File::open(path)?;
1275 let (_, stats) = drive_ndjson_matches_writer_with_stats(
1276 engine,
1277 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1278 predicate,
1279 limit,
1280 options,
1281 writer,
1282 )?;
1283 Ok(NdjsonExecutionReport::new(
1284 NdjsonRouteExplain::matches(NdjsonSourceCaps::file(options)),
1285 stats,
1286 ))
1287}
1288
1289pub fn run_ndjson_matches_source<W>(
1290 engine: &JetroEngine,
1291 source: NdjsonSource,
1292 predicate: &str,
1293 limit: usize,
1294 writer: W,
1295) -> Result<usize, JetroEngineError>
1296where
1297 W: Write,
1298{
1299 run_ndjson_matches_source_with_options(
1300 engine,
1301 source,
1302 predicate,
1303 limit,
1304 writer,
1305 NdjsonOptions::default(),
1306 )
1307}
1308
1309pub fn run_ndjson_matches_source_with_report<W>(
1310 engine: &JetroEngine,
1311 source: NdjsonSource,
1312 predicate: &str,
1313 limit: usize,
1314 writer: W,
1315) -> Result<NdjsonExecutionReport, JetroEngineError>
1316where
1317 W: Write,
1318{
1319 run_ndjson_matches_source_with_report_and_options(
1320 engine,
1321 source,
1322 predicate,
1323 limit,
1324 writer,
1325 NdjsonOptions::default(),
1326 )
1327}
1328
1329pub fn run_ndjson_matches_source_with_report_and_options<W>(
1330 engine: &JetroEngine,
1331 source: NdjsonSource,
1332 predicate: &str,
1333 limit: usize,
1334 writer: W,
1335 options: NdjsonOptions,
1336) -> Result<NdjsonExecutionReport, JetroEngineError>
1337where
1338 W: Write,
1339{
1340 match source {
1341 NdjsonSource::File(path) => run_ndjson_matches_file_with_report_and_options(
1342 engine, path, predicate, limit, writer, options,
1343 ),
1344 NdjsonSource::Reader(reader) => run_ndjson_matches_with_report_and_options(
1345 engine, reader, predicate, limit, writer, options,
1346 ),
1347 }
1348}
1349
1350pub fn run_ndjson_matches_source_with_options<W>(
1351 engine: &JetroEngine,
1352 source: NdjsonSource,
1353 predicate: &str,
1354 limit: usize,
1355 writer: W,
1356 options: NdjsonOptions,
1357) -> Result<usize, JetroEngineError>
1358where
1359 W: Write,
1360{
1361 match source {
1362 NdjsonSource::File(path) => {
1363 run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
1364 }
1365 NdjsonSource::Reader(reader) => {
1366 run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
1367 }
1368 }
1369}
1370
1371fn drive_ndjson<R, F>(
1372 engine: &JetroEngine,
1373 reader: R,
1374 query: &str,
1375 options: NdjsonOptions,
1376 mut emit: F,
1377) -> Result<usize, JetroEngineError>
1378where
1379 R: BufRead,
1380 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
1381{
1382 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1383 let plan = engine.cached_plan(query, PlanningContext::bytes());
1384 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1385 let mut count = 0;
1386
1387 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1388 let document = parse_row(engine, line_no, row)?;
1389 let out = collect_row_val(engine, &document, &plan, line_no)?;
1390 count += 1;
1391 if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
1392 break;
1393 }
1394 }
1395
1396 Ok(count)
1397}
1398
1399fn drive_ndjson_rows_file_plan<W>(
1400 engine: &JetroEngine,
1401 path: &Path,
1402 plan: &NdjsonRowsFilePlan,
1403 limit: Option<usize>,
1404 options: NdjsonOptions,
1405 writer: W,
1406) -> Result<usize, JetroEngineError>
1407where
1408 W: Write,
1409{
1410 match plan {
1411 NdjsonRowsFilePlan::Stream(plan) => {
1412 drive_ndjson_rows_stream_file(engine, path, plan, limit, options, writer)
1413 }
1414 NdjsonRowsFilePlan::Fanout(plan) => {
1415 drive_ndjson_rows_fanout_file(engine, path, plan, options, writer)
1416 }
1417 NdjsonRowsFilePlan::Subquery(plan) => {
1418 drive_ndjson_rows_subquery_file(engine, path, plan, options, writer)
1419 }
1420 }
1421}
1422
1423fn drive_ndjson_rows_subquery_file<P, W>(
1424 engine: &JetroEngine,
1425 path: P,
1426 plan: &RowStreamSubqueryPlan,
1427 options: NdjsonOptions,
1428 writer: W,
1429) -> Result<usize, JetroEngineError>
1430where
1431 P: AsRef<Path>,
1432 W: Write,
1433{
1434 let (stream_value, _) =
1435 collect_ndjson_rows_stream_file_with_stats(engine, path, &plan.stream, options)?;
1436 let wrapper = Compiler::compile(&plan.wrapper, "<ndjson-rows-wrapper>");
1437 let env = Env::new(Val::Null).with_var(STREAM_BINDING, stream_value);
1438 let value = engine
1439 .lock_vm()
1440 .exec_in_env(&wrapper, &env)
1441 .map_err(JetroEngineError::Eval)?;
1442 let mut writer = ndjson_writer_with_options(writer, options);
1443 let emitted = write_val_line_with_options(&mut writer, &value, options)? as usize;
1444 writer.flush()?;
1445 Ok(emitted)
1446}
1447
1448fn drive_ndjson_rows_subquery_file_with_stats<P, W>(
1449 engine: &JetroEngine,
1450 path: P,
1451 plan: &RowStreamSubqueryPlan,
1452 options: NdjsonOptions,
1453 writer: W,
1454) -> Result<(usize, RowStreamStats), JetroEngineError>
1455where
1456 P: AsRef<Path>,
1457 W: Write,
1458{
1459 let (stream_value, mut stats) =
1460 collect_ndjson_rows_stream_file_with_stats(engine, path, &plan.stream, options)?;
1461 let wrapper = Compiler::compile(&plan.wrapper, "<ndjson-rows-wrapper>");
1462 let env = Env::new(Val::Null).with_var(STREAM_BINDING, stream_value);
1463 let value = engine
1464 .lock_vm()
1465 .exec_in_env(&wrapper, &env)
1466 .map_err(JetroEngineError::Eval)?;
1467 let mut writer = ndjson_writer_with_options(writer, options);
1468 let emitted = write_val_line_with_options(&mut writer, &value, options)? as usize;
1469 stats.rows_emitted = emitted;
1470 writer.flush()?;
1471 Ok((emitted, stats))
1472}
1473
1474fn collect_ndjson_rows_stream_file_with_stats<P>(
1475 engine: &JetroEngine,
1476 path: P,
1477 plan: &RowStreamPlan,
1478 options: NdjsonOptions,
1479) -> Result<(Val, RowStreamStats), JetroEngineError>
1480where
1481 P: AsRef<Path>,
1482{
1483 if let Some(result) =
1484 super::ndjson_parallel::collect_rows_stream_file_with_stats(engine, path.as_ref(), plan, options)?
1485 {
1486 return Ok((result.value, result.stats));
1487 }
1488
1489 let mut executor = CompiledRowStream::new(plan);
1490 let mut out = Vec::new();
1491
1492 if plan.direction == RowStreamDirection::Forward {
1493 let file = File::open(path)?;
1494 let mut driver = NdjsonPerRowDriver::new(std::io::BufReader::with_capacity(
1495 options.reader_buffer_capacity,
1496 file,
1497 ))
1498 .with_options(options);
1499 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1500 while !executor.is_exhausted() {
1501 let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1502 break;
1503 };
1504 if collect_row_stream_result(
1505 engine,
1506 line_no,
1507 executor.apply_owned_row(engine, line_no, row)?,
1508 &mut out,
1509 )? || executor.is_exhausted()
1510 {
1511 break;
1512 }
1513 }
1514 } else {
1515 let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1516 while !executor.is_exhausted() {
1517 let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1518 break;
1519 };
1520 if collect_row_stream_result(
1521 engine,
1522 line_no,
1523 executor.apply_owned_row(engine, line_no, row)?,
1524 &mut out,
1525 )? || executor.is_exhausted()
1526 {
1527 break;
1528 }
1529 }
1530 }
1531
1532 if let Some(value) = executor.finish() {
1533 Ok((value, executor.stats().clone()))
1534 } else if plan.demand.retained_limit == Some(1) {
1535 Ok((
1536 out.into_iter().next().unwrap_or(Val::Null),
1537 executor.stats().clone(),
1538 ))
1539 } else {
1540 Ok((Val::Arr(std::sync::Arc::new(out)), executor.stats().clone()))
1541 }
1542}
1543
1544pub(super) fn collect_row_stream_result(
1545 engine: &JetroEngine,
1546 line_no: u64,
1547 result: RowStreamRowResult,
1548 out: &mut Vec<Val>,
1549) -> Result<bool, JetroEngineError> {
1550 match result {
1551 RowStreamRowResult::Emit(value) => out.push(value),
1552 RowStreamRowResult::EmitBytes(bytes) => {
1553 let document = parse_row(engine, line_no, bytes)?;
1554 let value = document
1555 .root_val_with(engine.keys())
1556 .map_err(|err| row_eval_error(line_no, err))?;
1557 out.push(value);
1558 }
1559 RowStreamRowResult::Skip => {}
1560 RowStreamRowResult::Stop => return Ok(true),
1561 }
1562 Ok(false)
1563}
1564
1565fn drive_ndjson_rows_stream_reader<R, W>(
1566 engine: &JetroEngine,
1567 reader: R,
1568 plan: &RowStreamPlan,
1569 external_limit: Option<usize>,
1570 options: NdjsonOptions,
1571 writer: W,
1572) -> Result<usize, JetroEngineError>
1573where
1574 R: BufRead,
1575 W: Write,
1576{
1577 let (emitted, _) = drive_ndjson_rows_stream_reader_with_stats(
1578 engine,
1579 reader,
1580 plan,
1581 external_limit,
1582 options,
1583 writer,
1584 )?;
1585 Ok(emitted)
1586}
1587
1588fn drive_ndjson_rows_stream_reader_with_stats<R, W>(
1589 engine: &JetroEngine,
1590 reader: R,
1591 plan: &RowStreamPlan,
1592 external_limit: Option<usize>,
1593 options: NdjsonOptions,
1594 writer: W,
1595) -> Result<(usize, RowStreamStats), JetroEngineError>
1596where
1597 R: BufRead,
1598 W: Write,
1599{
1600 if plan.direction == RowStreamDirection::Reverse {
1601 return Err(JetroEngineError::Eval(EvalError(
1602 "$.rows().reverse() requires a file-backed NDJSON source".into(),
1603 )));
1604 }
1605
1606 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1607 let mut executor = CompiledRowStream::new(plan);
1608 let mut writer = ndjson_writer_with_options(writer, options);
1609 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1610 let mut emitted = 0usize;
1611
1612 while !executor.is_exhausted() {
1613 let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1614 break;
1615 };
1616 if emit_row_stream_result(
1617 executor.apply_owned_row(engine, line_no, row)?,
1618 &mut writer,
1619 &mut emitted,
1620 external_limit,
1621 options,
1622 )? {
1623 break;
1624 }
1625 if executor.is_exhausted() {
1626 break;
1627 }
1628 }
1629
1630 emit_row_stream_finish(
1631 &executor,
1632 &mut writer,
1633 &mut emitted,
1634 external_limit,
1635 options,
1636 )?;
1637 writer.flush()?;
1638 Ok((emitted, executor.stats().clone()))
1639}
1640
1641fn drive_ndjson_rows_stream_file<P, W>(
1642 engine: &JetroEngine,
1643 path: P,
1644 plan: &RowStreamPlan,
1645 external_limit: Option<usize>,
1646 options: NdjsonOptions,
1647 writer: W,
1648) -> Result<usize, JetroEngineError>
1649where
1650 P: AsRef<Path>,
1651 W: Write,
1652{
1653 if let Some(value) =
1654 super::ndjson_parallel::collect_rows_stream_file(engine, path.as_ref(), plan, options)?
1655 {
1656 return write_collected_rows_stream(value, external_limit, options, writer);
1657 }
1658
1659 let (emitted, _) = drive_ndjson_rows_stream_file_with_stats(
1660 engine,
1661 path,
1662 plan,
1663 external_limit,
1664 options,
1665 writer,
1666 )?;
1667 Ok(emitted)
1668}
1669
1670fn write_collected_rows_stream<W: Write>(
1671 value: Val,
1672 external_limit: Option<usize>,
1673 options: NdjsonOptions,
1674 writer: W,
1675) -> Result<usize, JetroEngineError> {
1676 let mut writer = ndjson_writer_with_options(writer, options);
1677 let mut emitted = 0usize;
1678 match value {
1679 Val::Arr(values) => {
1680 for value in values.iter() {
1681 if external_limit.is_some_and(|limit| emitted >= limit) {
1682 break;
1683 }
1684 if write_val_line_with_options(&mut writer, value, options)? {
1685 emitted += 1;
1686 }
1687 }
1688 }
1689 value => {
1690 if write_val_line_with_options(&mut writer, &value, options)? {
1691 emitted += 1;
1692 }
1693 }
1694 }
1695 writer.flush()?;
1696 Ok(emitted)
1697}
1698
1699fn drive_ndjson_rows_stream_file_with_stats<P, W>(
1700 engine: &JetroEngine,
1701 path: P,
1702 plan: &RowStreamPlan,
1703 external_limit: Option<usize>,
1704 options: NdjsonOptions,
1705 writer: W,
1706) -> Result<(usize, RowStreamStats), JetroEngineError>
1707where
1708 P: AsRef<Path>,
1709 W: Write,
1710{
1711 if let Some(result) = super::ndjson_parallel::collect_rows_stream_file_with_stats(
1712 engine,
1713 path.as_ref(),
1714 plan,
1715 options,
1716 )? {
1717 let mut stats = result.stats;
1718 let emitted = write_collected_rows_stream(result.value, external_limit, options, writer)?;
1719 stats.rows_emitted = emitted;
1720 return Ok((emitted, stats));
1721 }
1722
1723 if plan.direction == RowStreamDirection::Forward {
1724 let file = File::open(path)?;
1725 return drive_ndjson_rows_stream_reader_with_stats(
1726 engine,
1727 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1728 plan,
1729 external_limit,
1730 options,
1731 writer,
1732 );
1733 }
1734
1735 let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1736 let mut executor = CompiledRowStream::new(plan);
1737 let mut writer = ndjson_writer_with_options(writer, options);
1738 let mut emitted = 0usize;
1739
1740 while !executor.is_exhausted() {
1741 let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1742 break;
1743 };
1744 if emit_row_stream_result(
1745 executor.apply_owned_row(engine, line_no, row)?,
1746 &mut writer,
1747 &mut emitted,
1748 external_limit,
1749 options,
1750 )? {
1751 break;
1752 }
1753 if executor.is_exhausted() {
1754 break;
1755 }
1756 }
1757
1758 emit_row_stream_finish(
1759 &executor,
1760 &mut writer,
1761 &mut emitted,
1762 external_limit,
1763 options,
1764 )?;
1765 writer.flush()?;
1766 Ok((emitted, executor.stats().clone()))
1767}
1768
1769fn emit_row_stream_finish<W: Write>(
1770 executor: &CompiledRowStream,
1771 writer: &mut W,
1772 emitted: &mut usize,
1773 external_limit: Option<usize>,
1774 options: NdjsonOptions,
1775) -> Result<(), JetroEngineError> {
1776 if external_limit.is_some_and(|limit| *emitted >= limit) {
1777 return Ok(());
1778 }
1779 if let Some(value) = executor.finish() {
1780 if write_val_line_with_options(writer, &value, options)? {
1781 *emitted += 1;
1782 }
1783 }
1784 Ok(())
1785}
1786
1787fn emit_row_stream_result<W: Write>(
1788 result: RowStreamRowResult,
1789 writer: &mut W,
1790 emitted: &mut usize,
1791 external_limit: Option<usize>,
1792 options: NdjsonOptions,
1793) -> Result<bool, JetroEngineError> {
1794 let wrote = match result {
1795 RowStreamRowResult::Emit(value) => write_val_line_with_options(writer, &value, options)?,
1796 RowStreamRowResult::EmitBytes(bytes) => {
1797 write_json_bytes_line_with_options(writer, &bytes, options)?
1798 }
1799 RowStreamRowResult::Skip => return Ok(false),
1800 RowStreamRowResult::Stop => return Ok(true),
1801 };
1802 if wrote {
1803 *emitted += 1;
1804 }
1805 Ok(external_limit.is_some_and(|limit| *emitted >= limit))
1806}
1807
1808fn drive_ndjson_writer<R, W>(
1809 engine: &JetroEngine,
1810 reader: R,
1811 query: &str,
1812 limit: Option<usize>,
1813 options: NdjsonOptions,
1814 writer: W,
1815) -> Result<usize, JetroEngineError>
1816where
1817 R: BufRead,
1818 W: Write,
1819{
1820 Ok(drive_ndjson_writer_with_stats(engine, reader, query, limit, options, writer)?.0)
1821}
1822
1823fn drive_ndjson_writer_with_stats<R, W>(
1824 engine: &JetroEngine,
1825 reader: R,
1826 query: &str,
1827 limit: Option<usize>,
1828 options: NdjsonOptions,
1829 writer: W,
1830) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
1831where
1832 R: BufRead,
1833 W: Write,
1834{
1835 if let Some((byte_plan, tape_plan)) = direct_writer_plans(engine, query) {
1836 if let Some(byte_plan) = byte_plan {
1837 return drive_ndjson_byte_writer(
1838 engine, reader, &byte_plan, &tape_plan, limit, options, writer,
1839 );
1840 }
1841 if tape_plan_can_write_byte_row(&tape_plan) {
1842 return drive_ndjson_tape_byte_writer(
1843 engine, reader, &tape_plan, limit, options, writer,
1844 );
1845 }
1846 return drive_ndjson_tape_writer(engine, reader, &tape_plan, limit, options, writer);
1847 }
1848
1849 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1850 let mut executor = NdjsonRowExecutor::new(engine, query);
1851 let mut writer = ndjson_writer_with_options(writer, options);
1852 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1853 let mut count = 0usize;
1854 let mut stats = NdjsonExecutionStats::default();
1855
1856 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1857 stats.rows_scanned += 1;
1858 let value = executor.eval_owned_row(line_no, row)?;
1859 stats.fallback_project_rows += 1;
1860 if write_val_line_with_options(&mut writer, &value, options)? {
1861 count += 1;
1862 stats.rows_emitted += 1;
1863 }
1864 if limit.is_some_and(|limit| count >= limit) {
1865 break;
1866 }
1867 }
1868
1869 writer.flush()?;
1870 Ok((count, stats))
1871}
1872fn drive_ndjson_byte_writer<R, W>(
1873 engine: &JetroEngine,
1874 reader: R,
1875 byte_plan: &NdjsonDirectBytePlan,
1876 tape_plan: &NdjsonDirectTapePlan,
1877 limit: Option<usize>,
1878 options: NdjsonOptions,
1879 writer: W,
1880) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
1881where
1882 R: BufRead,
1883 W: Write,
1884{
1885 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1886 let mut writer = ndjson_writer_with_options(writer, options);
1887 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1888 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1889 let mut scratch =
1890 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1891 let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1892 let mut count = 0usize;
1893 let mut stats = NdjsonExecutionStats::default();
1894
1895 visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1896 stats.rows_scanned += 1;
1897 out.clear();
1898 match write_ndjson_byte_plan_row(&mut out, row, byte_plan)? {
1899 BytePlanWrite::Done => {
1900 stats.direct_project_rows += 1;
1901 }
1902 BytePlanWrite::Fallback => {
1903 stats.fallback_project_rows += 1;
1904 scratch.parse_slice(row).map_err(|message| {
1905 row_parse_error(
1906 line_no,
1907 JetroEngineError::Eval(crate::EvalError(format!(
1908 "Invalid JSON: {message}"
1909 ))),
1910 )
1911 })?;
1912 tape_runner.write_row(&scratch, &mut out)?;
1913 }
1914 }
1915 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1916 count += 1;
1917 stats.rows_emitted += 1;
1918 }
1919 Ok(!limit.is_some_and(|limit| count >= limit))
1920 })?;
1921
1922 writer.flush()?;
1923 Ok((count, stats))
1924}
1925fn drive_ndjson_tape_byte_writer<R, W>(
1926 engine: &JetroEngine,
1927 reader: R,
1928 tape_plan: &NdjsonDirectTapePlan,
1929 limit: Option<usize>,
1930 options: NdjsonOptions,
1931 writer: W,
1932) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
1933where
1934 R: BufRead,
1935 W: Write,
1936{
1937 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1938 let mut writer = ndjson_writer_with_options(writer, options);
1939 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1940 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1941 let mut scratch =
1942 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1943 let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
1944 let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1945 let mut constant_stream_cache = NdjsonConstantStreamCache::default();
1946 let mut hint_state = matches!(
1947 tape_plan,
1948 NdjsonDirectTapePlan::Object(_)
1949 | NdjsonDirectTapePlan::Array(_)
1950 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1951 sink: NdjsonDirectStreamSink::Collect(_),
1952 ..
1953 })
1954 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1955 sink: NdjsonDirectStreamSink::First(_),
1956 ..
1957 })
1958 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1959 sink: NdjsonDirectStreamSink::Last(_),
1960 ..
1961 })
1962 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1963 sink: NdjsonDirectStreamSink::Count,
1964 ..
1965 })
1966 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1967 sink: NdjsonDirectStreamSink::Numeric { .. },
1968 ..
1969 })
1970 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1971 sink: NdjsonDirectStreamSink::Extreme { .. },
1972 ..
1973 })
1974 )
1975 .then(|| {
1976 NdjsonHintState::new(
1977 NdjsonHintConfig::default(),
1978 NdjsonHintAccessPlan::from_direct_plans(None, tape_plan),
1979 )
1980 });
1981 let mut count = 0usize;
1982 let mut stats = NdjsonExecutionStats::default();
1983
1984 visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1985 stats.rows_scanned += 1;
1986 out.clear();
1987 if let Some(write) = constant_stream_cache.write_row(&mut out, row, tape_plan)? {
1988 if matches!(write, BytePlanWrite::Done) {
1989 stats.direct_project_rows += 1;
1990 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1991 count += 1;
1992 stats.rows_emitted += 1;
1993 }
1994 return Ok(!limit.is_some_and(|limit| count >= limit));
1995 }
1996 }
1997 let hinted = if let Some(state) = hint_state.as_mut() {
1998 if state.observe_row(row) == NdjsonHintDecision::UseHints {
1999 byte_scratch.clear();
2000 let write = state
2001 .with_root_layout_match(row, |root, matched| {
2002 write_ndjson_hinted_tape_plan_row(
2003 &mut byte_scratch,
2004 tape_plan,
2005 root,
2006 matched,
2007 )
2008 })
2009 .transpose()?
2010 .unwrap_or(BytePlanWrite::Fallback);
2011 if matches!(write, BytePlanWrite::Done) {
2012 out.extend_from_slice(&byte_scratch);
2013 }
2014 Some(write)
2015 } else {
2016 None
2017 }
2018 } else {
2019 None
2020 };
2021 let write = match hinted {
2022 Some(write) => Ok(write),
2023 None => write_ndjson_byte_tape_plan_row(&mut out, row, tape_plan, &mut byte_scratch),
2024 };
2025 match write? {
2026 BytePlanWrite::Done => {
2027 stats.direct_project_rows += 1;
2028 }
2029 BytePlanWrite::Fallback => {
2030 stats.fallback_project_rows += 1;
2031 scratch.parse_slice(row).map_err(|message| {
2032 row_parse_error(
2033 line_no,
2034 JetroEngineError::Eval(crate::EvalError(format!(
2035 "Invalid JSON: {message}"
2036 ))),
2037 )
2038 })?;
2039 tape_runner.write_row(&scratch, &mut out)?;
2040 }
2041 }
2042 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
2043 count += 1;
2044 stats.rows_emitted += 1;
2045 }
2046 Ok(!limit.is_some_and(|limit| count >= limit))
2047 })?;
2048
2049 if let Some(state) = hint_state.as_ref() {
2050 let hint_stats = state.stats();
2051 stats.hint_learned_rows = hint_stats.learned_rows;
2052 stats.hint_rejected_rows = hint_stats.rejected_rows;
2053 stats.hint_rows = hint_stats.hinted_rows;
2054 stats.hint_layout_misses = hint_stats.layout_misses;
2055 stats.hint_disabled = hint_stats.disabled;
2056 }
2057
2058 writer.flush()?;
2059 Ok((count, stats))
2060}
2061fn visit_ndjson_borrowed_rows<R, F>(
2062 driver: &mut NdjsonPerRowDriver<R>,
2063 spill: &mut Vec<u8>,
2064 mut visit: F,
2065) -> Result<(), JetroEngineError>
2066where
2067 R: BufRead,
2068 F: FnMut(u64, &[u8]) -> Result<bool, JetroEngineError>,
2069{
2070 loop {
2071 spill.clear();
2072 let available = driver.reader.fill_buf()?;
2073 if available.is_empty() {
2074 return Ok(());
2075 }
2076 if let Some(pos) = memchr(b'\n', available) {
2077 driver.line_no += 1;
2078 let line_no = driver.line_no;
2079 let mut row = &available[..pos];
2080 if row.last() == Some(&b'\r') {
2081 row = &row[..row.len() - 1];
2082 }
2083 if line_no == 1 && row.starts_with(&[0xef, 0xbb, 0xbf]) {
2084 row = &row[3..];
2085 }
2086 let (start, end) = non_ws_range(row);
2087 let keep_going = if start == end {
2088 true
2089 } else {
2090 let trimmed = &row[start..end];
2091 if trimmed.len() > driver.max_line_len {
2092 return Err(RowError::LineTooLarge {
2093 line_no,
2094 len: trimmed.len(),
2095 max: driver.max_line_len,
2096 }
2097 .into());
2098 }
2099 match frame_payload(driver.row_frame, line_no, trimmed)? {
2100 FramePayload::Data(range) => visit(line_no, &trimmed[range])?,
2101 FramePayload::Skip => true,
2102 }
2103 };
2104 driver.reader.consume(pos + 1);
2105 if !keep_going {
2106 return Ok(());
2107 }
2108 } else {
2109 let read = driver.read_physical_line(spill)?;
2110 if read == 0 {
2111 return Ok(());
2112 }
2113 driver.line_no += 1;
2114 strip_initial_bom(driver.line_no, spill);
2115 trim_line_ending(spill);
2116 let (start, end) = non_ws_range(spill);
2117 if start == end {
2118 continue;
2119 }
2120 let len = end - start;
2121 if len > driver.max_line_len {
2122 return Err(RowError::LineTooLarge {
2123 line_no: driver.line_no,
2124 len,
2125 max: driver.max_line_len,
2126 }
2127 .into());
2128 }
2129 match frame_payload(driver.row_frame, driver.line_no, &spill[start..end])? {
2130 FramePayload::Data(range) => {
2131 if !visit(
2132 driver.line_no,
2133 &spill[start + range.start..start + range.end],
2134 )? {
2135 return Ok(());
2136 }
2137 }
2138 FramePayload::Skip => {}
2139 }
2140 }
2141 }
2142}
2143fn drive_ndjson_tape_writer<R, W>(
2144 engine: &JetroEngine,
2145 reader: R,
2146 plan: &NdjsonDirectTapePlan,
2147 limit: Option<usize>,
2148 options: NdjsonOptions,
2149 writer: W,
2150) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
2151where
2152 R: BufRead,
2153 W: Write,
2154{
2155 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2156 let mut writer = ndjson_writer_with_options(writer, options);
2157 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
2158 let mut scratch =
2159 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
2160 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
2161 let mut count = 0usize;
2162 let mut runner = NdjsonTapeWriterRunner::new(engine, plan);
2163 let mut stats = NdjsonExecutionStats::default();
2164
2165 while let Some((line_no, row)) = driver.read_next_nonempty(&mut line)? {
2166 stats.rows_scanned += 1;
2167 out.clear();
2168 scratch.parse_slice(row).map_err(|message| {
2169 row_parse_error(
2170 line_no,
2171 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
2172 )
2173 })?;
2174 runner.write_row(&scratch, &mut out)?;
2175 stats.fallback_project_rows += 1;
2176 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
2177 count += 1;
2178 stats.rows_emitted += 1;
2179 }
2180 if limit.is_some_and(|limit| count >= limit) {
2181 break;
2182 }
2183 }
2184
2185 writer.flush()?;
2186 Ok((count, stats))
2187}
2188pub(super) struct NdjsonTapeWriterRunner<'a, 'p> {
2189 plan: &'p NdjsonDirectTapePlan,
2190 vm: Option<MutexGuard<'a, VM>>,
2191 env: Option<crate::data::context::Env>,
2192 root_path: NdjsonPathCache,
2193 source_path: NdjsonPathCache,
2194 suffix_path: NdjsonPathCache,
2195 predicate_path: NdjsonPathCache,
2196 object_paths: Vec<NdjsonPathCache>,
2197}
2198impl<'a, 'p> NdjsonTapeWriterRunner<'a, 'p> {
2199 pub(super) fn new(engine: &'a JetroEngine, plan: &'p NdjsonDirectTapePlan) -> Self {
2200 let needs_vm = plan.needs_vm();
2201 Self {
2202 plan,
2203 vm: needs_vm.then(|| engine.lock_vm()),
2204 env: needs_vm.then(|| crate::data::context::Env::new(Val::Null)),
2205 root_path: NdjsonPathCache::default(),
2206 source_path: NdjsonPathCache::default(),
2207 suffix_path: NdjsonPathCache::default(),
2208 predicate_path: NdjsonPathCache::default(),
2209 object_paths: Vec::new(),
2210 }
2211 }
2212
2213 pub(super) fn write_row<W: Write>(
2214 &mut self,
2215 scratch: &crate::data::tape::TapeScratch,
2216 writer: &mut W,
2217 ) -> Result<(), JetroEngineError> {
2218 match self.plan {
2219 NdjsonDirectTapePlan::RootPath(steps) => {
2220 if let Some(idx) = self.root_path.index(scratch, 0, steps) {
2221 write_json_tape_at(writer, scratch, idx)?;
2222 } else {
2223 writer.write_all(b"null")?;
2224 }
2225 }
2226 NdjsonDirectTapePlan::ViewScalarCall {
2227 steps,
2228 call,
2229 optional,
2230 } => {
2231 let idx = self.root_path.index(scratch, 0, steps);
2232 let value = idx
2233 .map(|idx| json_tape_scalar(scratch, idx))
2234 .unwrap_or(crate::util::JsonView::Null);
2235 if *optional && matches!(value, crate::util::JsonView::Null) {
2236 writer.write_all(b"null")?;
2237 } else if let Some(value) = call.try_apply_json_view(value) {
2238 write_val_json(writer, &value)?;
2239 } else if let Some(idx) = idx {
2240 write_json_tape_at(writer, scratch, idx)?;
2241 } else {
2242 writer.write_all(b"null")?;
2243 }
2244 }
2245 NdjsonDirectTapePlan::ArrayElementViewScalarCall {
2246 source_steps,
2247 element,
2248 suffix_steps,
2249 call,
2250 } => {
2251 let idx = self
2252 .source_path
2253 .index(scratch, 0, source_steps)
2254 .and_then(|idx| json_tape_array_element(scratch, idx, *element))
2255 .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
2256 if let Some(value) = idx
2257 .map(|idx| json_tape_scalar(scratch, idx))
2258 .and_then(|value| call.try_apply_json_view(value))
2259 {
2260 write_val_json(writer, &value)?;
2261 } else if let Some(idx) = idx {
2262 write_json_tape_at(writer, scratch, idx)?;
2263 } else {
2264 writer.write_all(b"null")?;
2265 }
2266 }
2267 NdjsonDirectTapePlan::ObjectItems { steps, method } => {
2268 let idx = self.root_path.index(scratch, 0, steps);
2269 write_json_tape_object_items(writer, scratch, idx, *method)?;
2270 }
2271 NdjsonDirectTapePlan::ArrayElementPath {
2272 source_steps,
2273 element,
2274 suffix_steps,
2275 } => {
2276 let idx = self
2277 .source_path
2278 .index(scratch, 0, source_steps)
2279 .and_then(|idx| json_tape_array_element(scratch, idx, *element))
2280 .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
2281 if let Some(idx) = idx {
2282 write_json_tape_at(writer, scratch, idx)?;
2283 } else {
2284 writer.write_all(b"null")?;
2285 }
2286 }
2287 NdjsonDirectTapePlan::Stream(plan) => {
2288 write_json_tape_stream(
2289 writer,
2290 scratch,
2291 plan,
2292 &mut self.source_path,
2293 &mut self.suffix_path,
2294 &mut self.predicate_path,
2295 &mut self.object_paths,
2296 )?;
2297 }
2298 NdjsonDirectTapePlan::Object(fields) => {
2299 write_json_tape_object_projection(writer, scratch, fields, &mut self.object_paths)?;
2300 }
2301 NdjsonDirectTapePlan::Array(items) => {
2302 write_json_tape_array_projection(writer, scratch, items, &mut self.object_paths)?;
2303 }
2304 NdjsonDirectTapePlan::ViewPipeline { source_steps, body } => {
2305 let (Some(vm), Some(env)) = (self.vm.as_deref_mut(), self.env.as_ref()) else {
2306 return Err(JetroEngineError::Eval(crate::EvalError(
2307 "NDJSON view pipeline requires VM state".to_string(),
2308 )));
2309 };
2310 let source = json_tape_path_index(scratch, source_steps)
2311 .map(|idx| crate::data::view::TapeScratchView::Node { tape: scratch, idx })
2312 .unwrap_or(crate::data::view::TapeScratchView::Missing);
2313 let Some(result) =
2314 crate::exec::view::run_with_env_and_vm(source, body, None, &env, vm)
2315 else {
2316 writer.write_all(b"null")?;
2317 return Ok(());
2318 };
2319 write_val_json(writer, &result.map_err(JetroEngineError::Eval)?)?;
2320 }
2321 }
2322 Ok(())
2323 }
2324}
2325#[derive(Default)]
2326pub(super) struct NdjsonPathCache {
2327 fields: Vec<Option<NdjsonFieldCache>>,
2331}
2332#[derive(Clone, Copy)]
2333struct NdjsonFieldCache {
2334 key_delta: usize,
2335 value_delta: usize,
2336}
2337struct NdjsonPathCaches<'a> {
2338 source: &'a mut NdjsonPathCache,
2339 suffix: &'a mut NdjsonPathCache,
2340 predicate: &'a mut NdjsonPathCache,
2341}
2342impl NdjsonPathCache {
2343 fn index<T: JsonTape>(
2344 &mut self,
2345 tape: &T,
2346 start: usize,
2347 steps: &[crate::ir::physical::PhysicalPathStep],
2348 ) -> Option<usize> {
2349 if let Some(idx) = self.index_cached(tape, start, steps) {
2350 return Some(idx);
2351 }
2352 self.index_uncached(tape, start, steps)
2353 }
2354
2355 fn index_cached<T: JsonTape>(
2356 &self,
2357 tape: &T,
2358 start: usize,
2359 steps: &[crate::ir::physical::PhysicalPathStep],
2360 ) -> Option<usize> {
2361 use crate::ir::physical::PhysicalPathStep;
2362
2363 let [PhysicalPathStep::Field(key), rest @ ..] = steps else {
2364 return None;
2365 };
2366 if rest
2367 .iter()
2368 .any(|step| matches!(step, PhysicalPathStep::Field(_)))
2369 {
2370 return None;
2371 }
2372 let Some(field) = self
2373 .fields
2374 .first()
2375 .copied()
2376 .flatten()
2377 .filter(|field| field.key_delta > 1)
2378 else {
2379 return None;
2380 };
2381 let idx = json_tape_object_cached_field(tape, start, field, key.as_ref())?;
2382 let mut cur = idx;
2383 for step in rest {
2384 cur = json_tape_step_index(tape, cur, step)?;
2385 }
2386 Some(cur)
2387 }
2388
2389 fn index_uncached<T: JsonTape>(
2390 &mut self,
2391 tape: &T,
2392 start: usize,
2393 steps: &[crate::ir::physical::PhysicalPathStep],
2394 ) -> Option<usize> {
2395 self.index_from_depth(tape, start, steps, 0)
2396 }
2397
2398 fn index_from_depth<T: JsonTape>(
2399 &mut self,
2400 tape: &T,
2401 start: usize,
2402 steps: &[crate::ir::physical::PhysicalPathStep],
2403 depth: usize,
2404 ) -> Option<usize> {
2405 use crate::ir::physical::PhysicalPathStep;
2406
2407 match steps {
2408 [] => Some(start),
2409 [PhysicalPathStep::Field(key), rest @ ..] => {
2410 if self.fields.len() <= depth {
2411 self.fields.resize(depth + 1, None);
2412 }
2413
2414 if let Some(field) = self.fields[depth].filter(|field| field.key_delta > 1) {
2415 if let Some(idx) =
2416 json_tape_object_cached_field(tape, start, field, key.as_ref())
2417 {
2418 return self.index_from_depth(tape, idx, rest, depth + 1);
2419 }
2420 }
2421
2422 let (idx, field) =
2423 json_tape_object_field_index_and_cache(tape, start, key.as_ref())?;
2424 self.fields[depth] = Some(field);
2425 self.index_from_depth(tape, idx, rest, depth + 1)
2426 }
2427 [step, rest @ ..] => {
2428 let idx = json_tape_step_index(tape, start, step)?;
2429 self.index_from_depth(tape, idx, rest, depth + 1)
2430 }
2431 }
2432 }
2433}
2434fn drive_ndjson_tape_matches_writer_with_stats<R, W>(
2435 engine: &JetroEngine,
2436 reader: R,
2437 predicate: &NdjsonDirectPredicate,
2438 limit: usize,
2439 options: NdjsonOptions,
2440 writer: W,
2441) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
2442where
2443 R: BufRead,
2444 W: Write,
2445{
2446 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2447 let mut writer = ndjson_writer_with_options(writer, options);
2448 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
2449 let mut scratch =
2450 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
2451 let mut emitted = 0usize;
2452 let needs_vm = predicate_needs_vm(predicate);
2453 let mut vm = needs_vm.then(|| engine.lock_vm());
2454 let env = needs_vm.then(|| crate::data::context::Env::new(Val::Null));
2455 let mut predicate_path = NdjsonPathCache::default();
2456 let mut stats = NdjsonExecutionStats::default();
2457
2458 while let Some((line_no, row)) = driver.read_next_owned(&mut line)? {
2459 stats.rows_scanned += 1;
2460 if let Some(matched) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2461 stats.direct_filter_rows += 1;
2462 if !matched {
2463 stats.rows_filtered += 1;
2464 continue;
2465 }
2466 writer.write_all(&row)?;
2467 writer.write_all(b"\n")?;
2468 emitted += 1;
2469 stats.rows_emitted += 1;
2470 if emitted >= limit {
2471 break;
2472 }
2473 continue;
2474 }
2475
2476 scratch.parse_slice(&row).map_err(|message| {
2477 row_parse_error(
2478 line_no,
2479 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
2480 )
2481 })?;
2482 if !eval_tape_predicate(
2483 &scratch,
2484 predicate,
2485 env.as_ref(),
2486 &mut vm,
2487 &mut predicate_path,
2488 )
2489 .map_err(JetroEngineError::Eval)?
2490 {
2491 stats.fallback_filter_rows += 1;
2492 stats.rows_filtered += 1;
2493 continue;
2494 }
2495 stats.fallback_filter_rows += 1;
2496 writer.write_all(&row)?;
2497 writer.write_all(b"\n")?;
2498 emitted += 1;
2499 stats.rows_emitted += 1;
2500 if emitted >= limit {
2501 break;
2502 }
2503 }
2504
2505 writer.flush()?;
2506 Ok((emitted, stats))
2507}
2508
2509fn drive_ndjson_matches<R, F>(
2510 engine: &JetroEngine,
2511 reader: R,
2512 predicate: &str,
2513 limit: usize,
2514 options: NdjsonOptions,
2515 mut emit: F,
2516) -> Result<usize, JetroEngineError>
2517where
2518 R: BufRead,
2519 F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
2520{
2521 if limit == 0 {
2522 return Ok(0);
2523 }
2524
2525 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2526 let direct_predicate = direct_tape_predicate(engine, predicate);
2527 let mut executor = NdjsonRowExecutor::new(engine, predicate);
2528 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2529 let mut emitted = 0usize;
2530
2531 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2532 if let Some(predicate) = direct_predicate.as_ref() {
2533 if let Some(false) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2534 continue;
2535 }
2536 }
2537
2538 let document = executor.parse_owned_row(line_no, row)?;
2539 let matched = executor.eval_document(line_no, &document)?;
2540 if !is_truthy(&matched) {
2541 continue;
2542 }
2543
2544 let root = document
2545 .root_val_with(engine.keys())
2546 .map_err(|err| row_eval_error(line_no, err))?;
2547 emitted += 1;
2548 if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
2549 break;
2550 }
2551 }
2552
2553 Ok(emitted)
2554}
2555
2556fn drive_ndjson_matches_writer<R, W>(
2557 engine: &JetroEngine,
2558 reader: R,
2559 predicate: &str,
2560 limit: usize,
2561 options: NdjsonOptions,
2562 writer: W,
2563) -> Result<usize, JetroEngineError>
2564where
2565 R: BufRead,
2566 W: Write,
2567{
2568 Ok(drive_ndjson_matches_writer_with_stats(
2569 engine, reader, predicate, limit, options, writer,
2570 )?
2571 .0)
2572}
2573
2574fn drive_ndjson_matches_writer_with_stats<R, W>(
2575 engine: &JetroEngine,
2576 reader: R,
2577 predicate: &str,
2578 limit: usize,
2579 options: NdjsonOptions,
2580 writer: W,
2581) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
2582where
2583 R: BufRead,
2584 W: Write,
2585{
2586 if limit == 0 {
2587 return Ok((0, NdjsonExecutionStats::default()));
2588 }
2589 if let Some(predicate) = direct_tape_predicate(engine, predicate) {
2590 return drive_ndjson_tape_matches_writer_with_stats(
2591 engine, reader, &predicate, limit, options, writer,
2592 );
2593 }
2594
2595 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2596 let mut executor = NdjsonRowExecutor::new(engine, predicate);
2597 let mut writer = ndjson_writer_with_options(writer, options);
2598 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2599 let mut emitted = 0usize;
2600 let mut stats = NdjsonExecutionStats::default();
2601
2602 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2603 stats.rows_scanned += 1;
2604 let document = executor.parse_owned_row(line_no, row)?;
2605 let matched = executor.eval_document(line_no, &document)?;
2606 stats.fallback_filter_rows += 1;
2607 if !is_truthy(&matched) {
2608 stats.rows_filtered += 1;
2609 continue;
2610 }
2611
2612 write_document_line(&mut writer, &document, line_no, executor.engine())?;
2613 emitted += 1;
2614 stats.rows_emitted += 1;
2615 if emitted >= limit {
2616 break;
2617 }
2618 }
2619
2620 writer.flush()?;
2621 Ok((emitted, stats))
2622}
2623
2624pub(super) struct NdjsonRowExecutor<'a> {
2625 engine: &'a JetroEngine,
2626 plan: crate::ir::physical::QueryPlan,
2627 vm: MutexGuard<'a, VM>,
2628}
2629
2630impl<'a> NdjsonRowExecutor<'a> {
2631 pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
2632 Self {
2633 engine,
2634 plan: engine.cached_plan(query, PlanningContext::bytes()),
2635 vm: engine.lock_vm(),
2636 }
2637 }
2638
2639 pub(super) fn eval_owned_row(
2640 &mut self,
2641 line_no: u64,
2642 row: Vec<u8>,
2643 ) -> Result<Val, JetroEngineError> {
2644 let document = self.parse_owned_row(line_no, row)?;
2645 self.eval_document(line_no, &document)
2646 }
2647
2648 pub(super) fn parse_owned_row(
2649 &self,
2650 line_no: u64,
2651 row: Vec<u8>,
2652 ) -> Result<Jetro, JetroEngineError> {
2653 parse_row(self.engine, line_no, row)
2654 }
2655
2656 pub(super) fn eval_document(
2657 &mut self,
2658 line_no: u64,
2659 document: &Jetro,
2660 ) -> Result<Val, JetroEngineError> {
2661 crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
2662 .map_err(|err| row_eval_error(line_no, err))
2663 }
2664
2665 pub(super) fn engine(&self) -> &'a JetroEngine {
2666 self.engine
2667 }
2668}
2669trait JsonTape {
2670 fn nodes(&self) -> &[crate::data::tape::TapeNode];
2671 fn str_at(&self, idx: usize) -> &str;
2672 fn span(&self, idx: usize) -> usize;
2673}
2674impl JsonTape for crate::data::tape::TapeData {
2675 #[inline]
2676 fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2677 &self.nodes
2678 }
2679
2680 #[inline]
2681 fn str_at(&self, idx: usize) -> &str {
2682 self.str_at(idx)
2683 }
2684
2685 #[inline]
2686 fn span(&self, idx: usize) -> usize {
2687 self.span(idx)
2688 }
2689}
2690impl JsonTape for crate::data::tape::TapeScratch {
2691 #[inline]
2692 fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2693 &self.nodes
2694 }
2695
2696 #[inline]
2697 fn str_at(&self, idx: usize) -> &str {
2698 self.str_at(idx)
2699 }
2700
2701 #[inline]
2702 fn span(&self, idx: usize) -> usize {
2703 self.span(idx)
2704 }
2705}
2706fn json_tape_path_index<T: JsonTape>(
2707 tape: &T,
2708 steps: &[crate::ir::physical::PhysicalPathStep],
2709) -> Option<usize> {
2710 json_tape_path_index_from(tape, 0, steps)
2711}
2712fn json_tape_path_index_from<T: JsonTape>(
2713 tape: &T,
2714 start: usize,
2715 steps: &[crate::ir::physical::PhysicalPathStep],
2716) -> Option<usize> {
2717 if tape.nodes().is_empty() {
2718 return None;
2719 }
2720
2721 return match steps {
2722 [] => Some(start),
2723 [step] => json_tape_step_index(tape, start, step),
2724 [first, second] => json_tape_step_index(tape, start, first)
2725 .and_then(|idx| json_tape_step_index(tape, idx, second)),
2726 _ => json_tape_path_index_slow(tape, start, steps),
2727 };
2728}
2729fn json_tape_path_index_slow<T: JsonTape>(
2730 tape: &T,
2731 start: usize,
2732 steps: &[crate::ir::physical::PhysicalPathStep],
2733) -> Option<usize> {
2734 let mut idx = start;
2735 for step in steps {
2736 idx = json_tape_step_index(tape, idx, step)?;
2737 }
2738 Some(idx)
2739}
2740fn json_tape_step_index<T: JsonTape>(
2741 tape: &T,
2742 start: usize,
2743 step: &crate::ir::physical::PhysicalPathStep,
2744) -> Option<usize> {
2745 use crate::data::tape::TapeNode;
2746 use crate::ir::physical::PhysicalPathStep;
2747
2748 match step {
2749 PhysicalPathStep::Field(key) => {
2750 let TapeNode::Object { len, .. } = tape.nodes()[start] else {
2751 return None;
2752 };
2753 let mut cur = start + 1;
2754 for _ in 0..len {
2755 if tape.str_at(cur) == key.as_ref() {
2756 return Some(cur + 1);
2757 }
2758 cur += 1;
2759 cur += tape.span(cur);
2760 }
2761 None
2762 }
2763 PhysicalPathStep::Index(wanted) => {
2764 let TapeNode::Array { len, .. } = tape.nodes()[start] else {
2765 return None;
2766 };
2767 let wanted = if *wanted < 0 {
2768 len.checked_sub(wanted.unsigned_abs() as usize)?
2769 } else {
2770 *wanted as usize
2771 };
2772 if wanted >= len {
2773 return None;
2774 }
2775 let mut cur = start + 1;
2776 for _ in 0..wanted {
2777 cur += tape.span(cur);
2778 }
2779 Some(cur)
2780 }
2781 }
2782}
2783fn json_tape_object_cached_field<T: JsonTape>(
2784 tape: &T,
2785 obj_idx: usize,
2786 cache: NdjsonFieldCache,
2787 key: &str,
2788) -> Option<usize> {
2789 let crate::data::tape::TapeNode::Object { .. } = tape.nodes().get(obj_idx).copied()? else {
2790 return None;
2791 };
2792 let key_idx = obj_idx.checked_add(cache.key_delta)?;
2793 let value_idx = obj_idx.checked_add(cache.value_delta)?;
2794 if value_idx >= tape.nodes().len() {
2795 return None;
2796 }
2797 if !matches!(
2798 tape.nodes().get(key_idx),
2799 Some(crate::data::tape::TapeNode::String(_))
2800 ) {
2801 return None;
2802 }
2803 (tape.str_at(key_idx) == key).then_some(value_idx)
2804}
2805fn json_tape_object_field_index_and_cache<T: JsonTape>(
2806 tape: &T,
2807 obj_idx: usize,
2808 key: &str,
2809) -> Option<(usize, NdjsonFieldCache)> {
2810 let crate::data::tape::TapeNode::Object { len, .. } = tape.nodes()[obj_idx] else {
2811 return None;
2812 };
2813 let mut cur = obj_idx + 1;
2814 for _ in 0..len {
2815 if tape.str_at(cur) == key {
2816 return Some((
2817 cur + 1,
2818 NdjsonFieldCache {
2819 key_delta: cur - obj_idx,
2820 value_delta: cur + 1 - obj_idx,
2821 },
2822 ));
2823 }
2824 cur += 1;
2825 cur += tape.span(cur);
2826 }
2827 None
2828}
2829fn json_tape_array_element<T: JsonTape>(
2830 tape: &T,
2831 idx: usize,
2832 element: NdjsonDirectElement,
2833) -> Option<usize> {
2834 let crate::data::tape::TapeNode::Array { len, .. } = tape.nodes().get(idx).copied()? else {
2835 return None;
2836 };
2837 let wanted = match element {
2838 NdjsonDirectElement::First => 0,
2839 NdjsonDirectElement::Last => len.checked_sub(1)?,
2840 NdjsonDirectElement::Nth(n) => n,
2841 };
2842 if wanted >= len {
2843 return None;
2844 }
2845 let mut cur = idx + 1;
2846 for _ in 0..wanted {
2847 cur += tape.span(cur);
2848 }
2849 Some(cur)
2850}
2851pub(super) fn eval_tape_predicate(
2852 tape: &crate::data::tape::TapeScratch,
2853 predicate: &NdjsonDirectPredicate,
2854 env: Option<&crate::data::context::Env>,
2855 vm: &mut Option<std::sync::MutexGuard<'_, crate::vm::exec::VM>>,
2856 cache: &mut NdjsonPathCache,
2857) -> Result<bool, crate::EvalError> {
2858 use crate::parse::ast::BinOp;
2859
2860 Ok(match predicate {
2861 NdjsonDirectPredicate::Path(steps) => cache
2862 .index(tape, 0, steps)
2863 .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
2864 .unwrap_or(false),
2865 NdjsonDirectPredicate::Literal(value) => crate::util::is_truthy(value),
2866 NdjsonDirectPredicate::Not(inner) => !eval_tape_predicate(tape, inner, env, vm, cache)?,
2867 NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
2868 eval_tape_predicate(tape, lhs, env, vm, cache)?
2869 && eval_tape_predicate(tape, rhs, env, vm, cache)?
2870 }
2871 NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
2872 eval_tape_predicate(tape, lhs, env, vm, cache)?
2873 || eval_tape_predicate(tape, rhs, env, vm, cache)?
2874 }
2875 NdjsonDirectPredicate::Binary { lhs, op, rhs } => {
2876 let Some(lhs) = eval_tape_scalar(tape, lhs, cache) else {
2877 return Ok(false);
2878 };
2879 let Some(rhs) = eval_tape_scalar(tape, rhs, cache) else {
2880 return Ok(false);
2881 };
2882 crate::util::json_cmp_binop(lhs, *op, rhs)
2883 }
2884 NdjsonDirectPredicate::ViewScalarCall { steps, call } => cache
2885 .index(tape, 0, steps)
2886 .map(|idx| json_tape_scalar(tape, idx))
2887 .and_then(|value| call.try_apply_json_view(value))
2888 .is_some_and(|value| crate::util::is_truthy(&value)),
2889 NdjsonDirectPredicate::ArrayElementViewScalarCall {
2890 source_steps,
2891 element,
2892 suffix_steps,
2893 call,
2894 } => json_tape_path_index(tape, source_steps)
2895 .and_then(|idx| json_tape_array_element(tape, idx, *element))
2896 .and_then(|idx| json_tape_path_index_from(tape, idx, suffix_steps))
2897 .map(|idx| json_tape_scalar(tape, idx))
2898 .and_then(|value| call.try_apply_json_view(value))
2899 .is_some_and(|value| crate::util::is_truthy(&value)),
2900 NdjsonDirectPredicate::ArrayAny { .. } => {
2901 return Err(crate::EvalError(
2902 "array-any predicate requires VM state".to_string(),
2903 ));
2904 }
2905 NdjsonDirectPredicate::ViewPipeline { source_steps, body } => {
2906 let (Some(vm), Some(env)) = (vm.as_deref_mut(), env) else {
2907 return Err(crate::EvalError(
2908 "view pipeline predicate requires VM state".to_string(),
2909 ));
2910 };
2911 let source = json_tape_path_index(tape, source_steps)
2912 .map(|idx| crate::data::view::TapeScratchView::Node { tape, idx })
2913 .unwrap_or(crate::data::view::TapeScratchView::Missing);
2914 crate::exec::view::run_with_env_and_vm(source, body, None, env, vm)
2915 .transpose()?
2916 .is_some_and(|value| crate::util::is_truthy(&value))
2917 }
2918 })
2919}
2920pub(super) fn predicate_needs_vm(predicate: &NdjsonDirectPredicate) -> bool {
2921 match predicate {
2922 NdjsonDirectPredicate::Not(inner) => predicate_needs_vm(inner),
2923 NdjsonDirectPredicate::Binary { lhs, rhs, .. } => {
2924 predicate_needs_vm(lhs) || predicate_needs_vm(rhs)
2925 }
2926 NdjsonDirectPredicate::ArrayAny { .. } | NdjsonDirectPredicate::ViewPipeline { .. } => true,
2927 NdjsonDirectPredicate::Path(_)
2928 | NdjsonDirectPredicate::Literal(_)
2929 | NdjsonDirectPredicate::ViewScalarCall { .. }
2930 | NdjsonDirectPredicate::ArrayElementViewScalarCall { .. } => false,
2931 }
2932}
2933fn eval_tape_scalar<'a>(
2934 tape: &'a crate::data::tape::TapeScratch,
2935 predicate: &'a NdjsonDirectPredicate,
2936 cache: &mut NdjsonPathCache,
2937) -> Option<crate::util::JsonView<'a>> {
2938 match predicate {
2939 NdjsonDirectPredicate::Path(steps) => cache
2940 .index(tape, 0, steps)
2941 .map(|idx| json_tape_scalar(tape, idx)),
2942 NdjsonDirectPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2943 _ => None,
2944 }
2945}
2946fn json_view_truthy(value: crate::util::JsonView<'_>) -> bool {
2947 match value {
2948 crate::util::JsonView::Null => false,
2949 crate::util::JsonView::Bool(value) => value,
2950 crate::util::JsonView::Int(value) => value != 0,
2951 crate::util::JsonView::UInt(value) => value != 0,
2952 crate::util::JsonView::Float(value) => value != 0.0,
2953 crate::util::JsonView::Str(value) => !value.is_empty(),
2954 crate::util::JsonView::ArrayLen(len) | crate::util::JsonView::ObjectLen(len) => len > 0,
2955 }
2956}
2957fn json_tape_scalar<T: JsonTape>(tape: &T, idx: usize) -> crate::util::JsonView<'_> {
2958 use crate::data::tape::TapeNode;
2959 use simd_json::StaticNode as SN;
2960
2961 let Some(node) = tape.nodes().get(idx).copied() else {
2962 return crate::util::JsonView::Null;
2963 };
2964 match node {
2965 TapeNode::Static(SN::Null) => crate::util::JsonView::Null,
2966 TapeNode::Static(SN::Bool(value)) => crate::util::JsonView::Bool(value),
2967 TapeNode::Static(SN::I64(value)) => crate::util::JsonView::Int(value),
2968 TapeNode::Static(SN::U64(value)) => crate::util::JsonView::UInt(value),
2969 TapeNode::Static(SN::F64(value)) => crate::util::JsonView::Float(value),
2970 TapeNode::String(_) => crate::util::JsonView::Str(tape.str_at(idx)),
2971 TapeNode::Array { len, .. } => crate::util::JsonView::ArrayLen(len),
2972 TapeNode::Object { len, .. } => crate::util::JsonView::ObjectLen(len),
2973 }
2974}
2975
2976pub(super) fn write_document_line<W: Write>(
2977 writer: &mut W,
2978 document: &Jetro,
2979 line_no: u64,
2980 engine: &JetroEngine,
2981) -> Result<(), JetroEngineError> {
2982 if let Some(bytes) = document.raw_bytes() {
2983 writer.write_all(bytes)?;
2984 writer.write_all(b"\n")?;
2985 return Ok(());
2986 }
2987
2988 let root = document
2989 .root_val_with(engine.keys())
2990 .map_err(|err| row_eval_error(line_no, err))?;
2991 write_val_line(writer, &root)
2992}
2993
2994pub(super) fn write_val_json<W: Write>(
2995 writer: &mut W,
2996 value: &Val,
2997) -> Result<(), JetroEngineError> {
2998 match value {
2999 Val::Null => writer.write_all(b"null")?,
3000 Val::Bool(true) => writer.write_all(b"true")?,
3001 Val::Bool(false) => writer.write_all(b"false")?,
3002 Val::Int(n) => write_i64(writer, *n)?,
3003 Val::Float(n) => write_f64(writer, *n)?,
3004 Val::Str(s) => write_json_str(writer, s.as_ref())?,
3005 Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
3006 Val::Arr(items) => write_json_array(writer, items.iter())?,
3007 Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
3008 Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
3009 Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
3010 Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
3011 Val::Obj(entries) => write_json_object(
3012 writer,
3013 entries.iter().map(|(key, value)| (key.as_ref(), value)),
3014 )?,
3015 Val::ObjSmall(entries) => write_json_object(
3016 writer,
3017 entries.iter().map(|(key, value)| (key.as_ref(), value)),
3018 )?,
3019 Val::ObjVec(data) => write_json_objvec(writer, data)?,
3020 }
3021 Ok(())
3022}
3023fn write_json_tape_at<W: Write, T: JsonTape>(
3024 writer: &mut W,
3025 tape: &T,
3026 idx: usize,
3027) -> Result<usize, JetroEngineError> {
3028 use crate::data::tape::TapeNode;
3029 use simd_json::StaticNode as SN;
3030
3031 let Some(node) = tape.nodes().get(idx).copied() else {
3032 writer.write_all(b"null")?;
3033 return Ok(idx);
3034 };
3035
3036 match node {
3037 TapeNode::Static(SN::Null) => {
3038 writer.write_all(b"null")?;
3039 Ok(idx + 1)
3040 }
3041 TapeNode::Static(SN::Bool(true)) => {
3042 writer.write_all(b"true")?;
3043 Ok(idx + 1)
3044 }
3045 TapeNode::Static(SN::Bool(false)) => {
3046 writer.write_all(b"false")?;
3047 Ok(idx + 1)
3048 }
3049 TapeNode::Static(SN::I64(value)) => {
3050 write_i64(writer, value)?;
3051 Ok(idx + 1)
3052 }
3053 TapeNode::Static(SN::U64(value)) => {
3054 write_u64(writer, value)?;
3055 Ok(idx + 1)
3056 }
3057 TapeNode::Static(SN::F64(value)) => {
3058 write_f64(writer, value)?;
3059 Ok(idx + 1)
3060 }
3061 TapeNode::String(_) => {
3062 write_json_str(writer, tape.str_at(idx))?;
3063 Ok(idx + 1)
3064 }
3065 TapeNode::Array { len, .. } => {
3066 writer.write_all(b"[")?;
3067 let mut cur = idx + 1;
3068 for item_idx in 0..len {
3069 if item_idx > 0 {
3070 writer.write_all(b",")?;
3071 }
3072 cur = write_json_tape_at(writer, tape, cur)?;
3073 }
3074 writer.write_all(b"]")?;
3075 Ok(cur)
3076 }
3077 TapeNode::Object { len, .. } => {
3078 writer.write_all(b"{")?;
3079 let mut cur = idx + 1;
3080 for field_idx in 0..len {
3081 if field_idx > 0 {
3082 writer.write_all(b",")?;
3083 }
3084 write_json_str(writer, tape.str_at(cur))?;
3085 writer.write_all(b":")?;
3086 cur = write_json_tape_at(writer, tape, cur + 1)?;
3087 }
3088 writer.write_all(b"}")?;
3089 Ok(cur)
3090 }
3091 }
3092}
3093fn visit_json_tape_source_items<T, E, F>(tape: &T, source_idx: usize, mut visit: F) -> Result<(), E>
3094where
3095 T: JsonTape,
3096 F: FnMut(usize) -> Result<(), E>,
3097{
3098 use crate::data::tape::TapeNode;
3099
3100 match tape.nodes().get(source_idx).copied() {
3101 Some(TapeNode::Array { len, .. }) => {
3102 let mut cur = source_idx + 1;
3103 for _ in 0..len {
3104 visit(cur)?;
3105 cur += tape.span(cur);
3106 }
3107 }
3108 Some(_) => visit(source_idx)?,
3109 None => {}
3110 }
3111 Ok(())
3112}
3113fn find_json_tape_source_item<T, F>(tape: &T, source_idx: usize, mut matches: F) -> Option<usize>
3114where
3115 T: JsonTape,
3116 F: FnMut(usize) -> bool,
3117{
3118 use crate::data::tape::TapeNode;
3119
3120 match tape.nodes().get(source_idx).copied()? {
3121 TapeNode::Array { len, .. } => {
3122 let mut cur = source_idx + 1;
3123 for _ in 0..len {
3124 if matches(cur) {
3125 return Some(cur);
3126 }
3127 cur += tape.span(cur);
3128 }
3129 None
3130 }
3131 _ => matches(source_idx).then_some(source_idx),
3132 }
3133}
3134fn write_json_tape_stream<W: Write, T: JsonTape>(
3135 writer: &mut W,
3136 tape: &T,
3137 plan: &NdjsonDirectStreamPlan,
3138 source_cache: &mut NdjsonPathCache,
3139 suffix_cache: &mut NdjsonPathCache,
3140 predicate_cache: &mut NdjsonPathCache,
3141 projection_caches: &mut Vec<NdjsonPathCache>,
3142) -> Result<(), JetroEngineError> {
3143 let Some(source_idx) = source_cache.index(tape, 0, &plan.source_steps) else {
3144 write_json_tape_empty_stream_result(writer, &plan.sink)?;
3145 return Ok(());
3146 };
3147
3148 match &plan.sink {
3149 NdjsonDirectStreamSink::Collect(map) => {
3150 writer.write_all(b"[")?;
3151 let mut wrote_row = false;
3152 visit_json_tape_source_items(tape, source_idx, |item_idx| {
3153 if !plan.predicate.as_ref().is_none_or(|predicate| {
3154 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3155 }) {
3156 return Ok::<(), JetroEngineError>(());
3157 }
3158 if wrote_row {
3159 writer.write_all(b",")?;
3160 }
3161 write_json_tape_stream_map(
3162 writer,
3163 tape,
3164 item_idx,
3165 map,
3166 suffix_cache,
3167 projection_caches,
3168 )?;
3169 wrote_row = true;
3170 Ok(())
3171 })?;
3172 writer.write_all(b"]")?;
3173 }
3174 NdjsonDirectStreamSink::Count => {
3175 let mut count = 0usize;
3176 let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
3177 if plan.predicate.as_ref().is_none_or(|predicate| {
3178 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3179 }) {
3180 count += 1;
3181 }
3182 Ok(())
3183 });
3184 write_i64(writer, count as i64)?;
3185 }
3186 NdjsonDirectStreamSink::First(map) => {
3187 let selected = find_json_tape_source_item(tape, source_idx, |item_idx| {
3188 plan.predicate.as_ref().is_none_or(|predicate| {
3189 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3190 })
3191 });
3192 if let Some(item_idx) = selected {
3193 write_json_tape_stream_map(
3194 writer,
3195 tape,
3196 item_idx,
3197 map,
3198 suffix_cache,
3199 projection_caches,
3200 )?;
3201 } else {
3202 writer.write_all(b"null")?;
3203 }
3204 }
3205 NdjsonDirectStreamSink::Last(map) => {
3206 let mut selected = None;
3207 visit_json_tape_source_items(tape, source_idx, |item_idx| {
3208 if plan.predicate.as_ref().is_none_or(|predicate| {
3209 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3210 }) {
3211 selected = Some(item_idx);
3212 }
3213 Ok::<(), JetroEngineError>(())
3214 })?;
3215 if let Some(item_idx) = selected {
3216 write_json_tape_stream_map(
3217 writer,
3218 tape,
3219 item_idx,
3220 map,
3221 suffix_cache,
3222 projection_caches,
3223 )?;
3224 } else {
3225 writer.write_all(b"null")?;
3226 }
3227 }
3228 NdjsonDirectStreamSink::Numeric { suffix_steps, op } => {
3229 let caches = NdjsonPathCaches {
3230 source: source_cache,
3231 suffix: suffix_cache,
3232 predicate: predicate_cache,
3233 };
3234 let value = reduce_json_tape_numeric_path(
3235 tape,
3236 &plan.source_steps,
3237 plan.predicate.as_ref(),
3238 suffix_steps,
3239 *op,
3240 caches,
3241 );
3242 write_val_json(writer, &value)?;
3243 }
3244 NdjsonDirectStreamSink::Extreme {
3245 key_steps,
3246 want_max,
3247 value,
3248 } => {
3249 let mut best_idx = None;
3250 visit_json_tape_source_items(tape, source_idx, |item_idx| {
3251 let Some(key_idx) = suffix_cache.index(tape, item_idx, key_steps) else {
3252 return Ok::<(), JetroEngineError>(());
3253 };
3254 let key = json_tape_scalar(tape, key_idx);
3255 let replace = best_idx
3256 .and_then(|idx| suffix_cache.index(tape, idx, key_steps))
3257 .map(|idx| {
3258 let order = crate::util::json_cmp_vals(key, json_tape_scalar(tape, idx));
3259 (*want_max && order.is_gt()) || (!*want_max && order.is_lt())
3260 })
3261 .unwrap_or(true);
3262 if replace {
3263 best_idx = Some(item_idx);
3264 }
3265 Ok(())
3266 })?;
3267 if let Some(item_idx) = best_idx {
3268 let path_idx = match value {
3269 NdjsonDirectProjectionValue::Path(steps)
3270 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3271 suffix_cache.index(tape, item_idx, steps)
3272 }
3273 NdjsonDirectProjectionValue::Nested(_)
3274 | NdjsonDirectProjectionValue::Literal(_) => None,
3275 };
3276 write_json_tape_direct_value(writer, tape, value, path_idx)?;
3277 } else {
3278 writer.write_all(b"null")?;
3279 }
3280 }
3281 }
3282
3283 Ok(())
3284}
3285fn write_json_tape_empty_stream_result<W: Write>(
3286 writer: &mut W,
3287 sink: &NdjsonDirectStreamSink,
3288) -> Result<(), JetroEngineError> {
3289 match sink {
3290 NdjsonDirectStreamSink::Collect(_) => writer.write_all(b"[]")?,
3291 NdjsonDirectStreamSink::First(_) | NdjsonDirectStreamSink::Last(_) => {
3292 writer.write_all(b"null")?
3293 }
3294 NdjsonDirectStreamSink::Count => writer.write_all(b"0")?,
3295 NdjsonDirectStreamSink::Numeric { op, .. } => {
3296 let value = crate::exec::pipeline::num_finalise(
3297 *op,
3298 0,
3299 0.0,
3300 false,
3301 f64::INFINITY,
3302 f64::NEG_INFINITY,
3303 0,
3304 );
3305 write_val_json(writer, &value)?;
3306 }
3307 NdjsonDirectStreamSink::Extreme { .. } => writer.write_all(b"null")?,
3308 }
3309 Ok(())
3310}
3311fn write_json_tape_stream_map<W: Write, T: JsonTape>(
3312 writer: &mut W,
3313 tape: &T,
3314 item_idx: usize,
3315 map: &NdjsonDirectStreamMap,
3316 suffix_cache: &mut NdjsonPathCache,
3317 projection_caches: &mut Vec<NdjsonPathCache>,
3318) -> Result<(), JetroEngineError> {
3319 match map {
3320 NdjsonDirectStreamMap::Value(value) => {
3321 let path_idx = match value {
3322 NdjsonDirectProjectionValue::Path(steps)
3323 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3324 suffix_cache.index(tape, item_idx, steps)
3325 }
3326 NdjsonDirectProjectionValue::Nested(_) => None,
3327 NdjsonDirectProjectionValue::Literal(_) => None,
3328 };
3329 write_json_tape_direct_value(writer, tape, value, path_idx)?;
3330 }
3331 NdjsonDirectStreamMap::Array(items) => {
3332 write_json_tape_array_projection_from(
3333 writer,
3334 tape,
3335 item_idx,
3336 items,
3337 projection_caches,
3338 )?;
3339 }
3340 NdjsonDirectStreamMap::Object(fields) => {
3341 write_json_tape_object_projection_from(
3342 writer,
3343 tape,
3344 item_idx,
3345 fields,
3346 projection_caches,
3347 )?;
3348 }
3349 }
3350 Ok(())
3351}
3352fn write_json_tape_object_projection<W: Write, T: JsonTape>(
3353 writer: &mut W,
3354 tape: &T,
3355 fields: &[super::ndjson_direct::NdjsonDirectObjectField],
3356 path_caches: &mut Vec<NdjsonPathCache>,
3357) -> Result<(), JetroEngineError> {
3358 write_json_tape_object_projection_from(writer, tape, 0, fields, path_caches)
3359}
3360fn write_json_tape_object_projection_from<W: Write, T: JsonTape>(
3361 writer: &mut W,
3362 tape: &T,
3363 start: usize,
3364 fields: &[super::ndjson_direct::NdjsonDirectObjectField],
3365 path_caches: &mut Vec<NdjsonPathCache>,
3366) -> Result<(), JetroEngineError> {
3367 if path_caches.len() < fields.len() {
3368 path_caches.resize_with(fields.len(), NdjsonPathCache::default);
3369 }
3370 writer.write_all(b"{")?;
3371 let mut wrote = false;
3372 for (field_idx, field) in fields.iter().enumerate() {
3373 let path_cache = &mut path_caches[field_idx];
3374 let mut path_idx = None;
3375 match &field.value {
3376 NdjsonDirectProjectionValue::Path(steps) => {
3377 let idx = path_cache.index(tape, start, steps);
3378 path_idx = idx;
3379 if field.optional
3380 && idx
3381 .map(|idx| {
3382 matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
3383 })
3384 .unwrap_or(true)
3385 {
3386 continue;
3387 }
3388 }
3389 NdjsonDirectProjectionValue::ViewScalarCall {
3390 steps,
3391 call,
3392 optional,
3393 } => {
3394 let idx = path_cache.index(tape, start, steps);
3395 path_idx = idx;
3396 if (*optional || field.optional)
3397 && idx
3398 .map(|idx| {
3399 matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
3400 })
3401 .unwrap_or(true)
3402 {
3403 continue;
3404 }
3405 if field.optional
3406 && idx
3407 .map(|idx| json_tape_scalar(tape, idx))
3408 .and_then(|value| call.try_apply_json_view(value))
3409 .is_some_and(|value| matches!(value, Val::Null))
3410 {
3411 continue;
3412 }
3413 }
3414 NdjsonDirectProjectionValue::Literal(Val::Null) if field.optional => {
3415 continue;
3416 }
3417 NdjsonDirectProjectionValue::Nested(_) => {}
3418 NdjsonDirectProjectionValue::Literal(_) => {}
3419 }
3420 if wrote {
3421 writer.write_all(b",")?;
3422 }
3423 write_json_str(writer, field.key.as_ref())?;
3424 writer.write_all(b":")?;
3425 write_json_tape_direct_value(writer, tape, &field.value, path_idx)?;
3426 wrote = true;
3427 }
3428 writer.write_all(b"}")?;
3429 Ok(())
3430}
3431fn write_json_tape_array_projection<W: Write, T: JsonTape>(
3432 writer: &mut W,
3433 tape: &T,
3434 items: &[NdjsonDirectProjectionValue],
3435 path_caches: &mut Vec<NdjsonPathCache>,
3436) -> Result<(), JetroEngineError> {
3437 write_json_tape_array_projection_from(writer, tape, 0, items, path_caches)
3438}
3439fn write_json_tape_array_projection_from<W: Write, T: JsonTape>(
3440 writer: &mut W,
3441 tape: &T,
3442 start: usize,
3443 items: &[NdjsonDirectProjectionValue],
3444 path_caches: &mut Vec<NdjsonPathCache>,
3445) -> Result<(), JetroEngineError> {
3446 if path_caches.len() < items.len() {
3447 path_caches.resize_with(items.len(), NdjsonPathCache::default);
3448 }
3449 writer.write_all(b"[")?;
3450 for (idx, item) in items.iter().enumerate() {
3451 if idx > 0 {
3452 writer.write_all(b",")?;
3453 }
3454 let path_idx = match item {
3455 NdjsonDirectProjectionValue::Path(steps)
3456 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3457 path_caches[idx].index(tape, start, steps)
3458 }
3459 NdjsonDirectProjectionValue::Nested(_) => None,
3460 NdjsonDirectProjectionValue::Literal(_) => None,
3461 };
3462 write_json_tape_direct_value(writer, tape, item, path_idx)?;
3463 }
3464 writer.write_all(b"]")?;
3465 Ok(())
3466}
3467fn write_json_tape_direct_value<W: Write, T: JsonTape>(
3468 writer: &mut W,
3469 tape: &T,
3470 value: &NdjsonDirectProjectionValue,
3471 path_idx: Option<usize>,
3472) -> Result<(), JetroEngineError> {
3473 match value {
3474 NdjsonDirectProjectionValue::Path(_) => {
3475 if let Some(idx) = path_idx {
3476 write_json_tape_at(writer, tape, idx)?;
3477 } else {
3478 writer.write_all(b"null")?;
3479 }
3480 }
3481 NdjsonDirectProjectionValue::ViewScalarCall { call, .. } => {
3482 if let Some(idx) = path_idx {
3483 let value = json_tape_scalar(tape, idx);
3484 if let Some(value) = call.try_apply_json_view(value) {
3485 write_val_json(writer, &value)?;
3486 } else {
3487 write_json_tape_at(writer, tape, idx)?;
3488 }
3489 } else {
3490 writer.write_all(b"null")?;
3491 }
3492 }
3493 NdjsonDirectProjectionValue::Literal(value) => write_val_json(writer, value)?,
3494 NdjsonDirectProjectionValue::Nested(plan) => {
3495 write_json_tape_nested_plan(writer, tape, plan)?;
3496 }
3497 }
3498 Ok(())
3499}
3500fn write_json_tape_nested_plan<W: Write, T: JsonTape>(
3501 writer: &mut W,
3502 tape: &T,
3503 plan: &NdjsonDirectTapePlan,
3504) -> Result<(), JetroEngineError> {
3505 let mut root_cache = NdjsonPathCache::default();
3506 let mut source_cache = NdjsonPathCache::default();
3507 let mut suffix_cache = NdjsonPathCache::default();
3508 let mut predicate_cache = NdjsonPathCache::default();
3509 let mut projection_caches = Vec::new();
3510 match plan {
3511 NdjsonDirectTapePlan::RootPath(steps) => {
3512 if let Some(idx) = root_cache.index(tape, 0, steps) {
3513 write_json_tape_at(writer, tape, idx)?;
3514 } else {
3515 writer.write_all(b"null")?;
3516 }
3517 }
3518 NdjsonDirectTapePlan::ViewScalarCall {
3519 steps,
3520 call,
3521 optional,
3522 } => {
3523 let idx = root_cache.index(tape, 0, steps);
3524 let value = idx
3525 .map(|idx| json_tape_scalar(tape, idx))
3526 .unwrap_or(crate::util::JsonView::Null);
3527 if *optional && matches!(value, crate::util::JsonView::Null) {
3528 writer.write_all(b"null")?;
3529 } else if let Some(value) = call.try_apply_json_view(value) {
3530 write_val_json(writer, &value)?;
3531 } else if let Some(idx) = idx {
3532 write_json_tape_at(writer, tape, idx)?;
3533 } else {
3534 writer.write_all(b"null")?;
3535 }
3536 }
3537 NdjsonDirectTapePlan::ArrayElementPath {
3538 source_steps,
3539 element,
3540 suffix_steps,
3541 } => {
3542 write_json_tape_array_element_path(
3543 writer,
3544 tape,
3545 source_steps,
3546 *element,
3547 suffix_steps,
3548 &mut source_cache,
3549 &mut suffix_cache,
3550 )?;
3551 }
3552 NdjsonDirectTapePlan::ArrayElementViewScalarCall {
3553 source_steps,
3554 element,
3555 suffix_steps,
3556 call,
3557 } => {
3558 write_json_tape_array_element_scalar(
3559 writer,
3560 tape,
3561 source_steps,
3562 *element,
3563 suffix_steps,
3564 call,
3565 &mut source_cache,
3566 &mut suffix_cache,
3567 )?;
3568 }
3569 NdjsonDirectTapePlan::Stream(stream) => {
3570 write_json_tape_stream(
3571 writer,
3572 tape,
3573 stream,
3574 &mut source_cache,
3575 &mut suffix_cache,
3576 &mut predicate_cache,
3577 &mut projection_caches,
3578 )?;
3579 }
3580 NdjsonDirectTapePlan::Object(fields) => {
3581 write_json_tape_object_projection(writer, tape, fields, &mut projection_caches)?;
3582 }
3583 NdjsonDirectTapePlan::Array(items) => {
3584 write_json_tape_array_projection(writer, tape, items, &mut projection_caches)?;
3585 }
3586 NdjsonDirectTapePlan::ObjectItems { steps, method } => {
3587 let idx = root_cache.index(tape, 0, steps);
3588 write_json_tape_object_items(writer, tape, idx, *method)?;
3589 }
3590 NdjsonDirectTapePlan::ViewPipeline { .. } => {
3591 writer.write_all(b"null")?;
3592 }
3593 }
3594 Ok(())
3595}
3596fn write_json_tape_array_element_path<W: Write, T: JsonTape>(
3597 writer: &mut W,
3598 tape: &T,
3599 source_steps: &[crate::ir::physical::PhysicalPathStep],
3600 element: super::ndjson_direct::NdjsonDirectElement,
3601 suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3602 source_cache: &mut NdjsonPathCache,
3603 suffix_cache: &mut NdjsonPathCache,
3604) -> Result<(), JetroEngineError> {
3605 let idx = source_cache
3606 .index(tape, 0, source_steps)
3607 .and_then(|idx| json_tape_array_element(tape, idx, element))
3608 .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3609 if let Some(idx) = idx {
3610 write_json_tape_at(writer, tape, idx)?;
3611 } else {
3612 writer.write_all(b"null")?;
3613 }
3614 Ok(())
3615}
3616fn write_json_tape_array_element_scalar<W: Write, T: JsonTape>(
3617 writer: &mut W,
3618 tape: &T,
3619 source_steps: &[crate::ir::physical::PhysicalPathStep],
3620 element: super::ndjson_direct::NdjsonDirectElement,
3621 suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3622 call: &crate::builtins::BuiltinCall,
3623 source_cache: &mut NdjsonPathCache,
3624 suffix_cache: &mut NdjsonPathCache,
3625) -> Result<(), JetroEngineError> {
3626 let idx = source_cache
3627 .index(tape, 0, source_steps)
3628 .and_then(|idx| json_tape_array_element(tape, idx, element))
3629 .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3630 if let Some(value) = idx
3631 .map(|idx| json_tape_scalar(tape, idx))
3632 .and_then(|value| call.try_apply_json_view(value))
3633 {
3634 write_val_json(writer, &value)?;
3635 } else if let Some(idx) = idx {
3636 write_json_tape_at(writer, tape, idx)?;
3637 } else {
3638 writer.write_all(b"null")?;
3639 }
3640 Ok(())
3641}
3642fn write_json_tape_object_items<W: Write, T: JsonTape>(
3643 writer: &mut W,
3644 tape: &T,
3645 obj_idx: Option<usize>,
3646 method: crate::builtins::BuiltinMethod,
3647) -> Result<(), JetroEngineError> {
3648 let Some(obj_idx) = obj_idx else {
3649 writer.write_all(b"[]")?;
3650 return Ok(());
3651 };
3652 let Some(crate::data::tape::TapeNode::Object { len, .. }) = tape.nodes().get(obj_idx).copied()
3653 else {
3654 writer.write_all(b"[]")?;
3655 return Ok(());
3656 };
3657
3658 writer.write_all(b"[")?;
3659 let mut cur = obj_idx + 1;
3660 for field_idx in 0..len {
3661 if field_idx > 0 {
3662 writer.write_all(b",")?;
3663 }
3664 match method {
3665 crate::builtins::BuiltinMethod::Keys => {
3666 write_json_str(writer, tape.str_at(cur))?;
3667 cur += 1;
3668 cur += tape.span(cur);
3669 }
3670 crate::builtins::BuiltinMethod::Values => {
3671 cur = write_json_tape_at(writer, tape, cur + 1)?;
3672 }
3673 crate::builtins::BuiltinMethod::Entries => {
3674 writer.write_all(b"[")?;
3675 write_json_str(writer, tape.str_at(cur))?;
3676 writer.write_all(b",")?;
3677 cur = write_json_tape_at(writer, tape, cur + 1)?;
3678 writer.write_all(b"]")?;
3679 }
3680 _ => unreachable!("non-object-items builtin"),
3681 }
3682 }
3683 writer.write_all(b"]")?;
3684 Ok(())
3685}
3686fn reduce_json_tape_numeric_path<T: JsonTape>(
3687 tape: &T,
3688 source_steps: &[crate::ir::physical::PhysicalPathStep],
3689 predicate: Option<&NdjsonDirectItemPredicate>,
3690 suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3691 op: crate::exec::pipeline::NumOp,
3692 caches: NdjsonPathCaches<'_>,
3693) -> Val {
3694 let mut acc_i = 0i64;
3695 let mut acc_f = 0.0f64;
3696 let mut floated = false;
3697 let mut min_f = f64::INFINITY;
3698 let mut max_f = f64::NEG_INFINITY;
3699 let mut n_obs = 0usize;
3700
3701 let Some(source_idx) = caches.source.index(tape, 0, source_steps) else {
3702 return crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs);
3703 };
3704
3705 let suffix_cache = caches.suffix;
3706 let predicate_cache = caches.predicate;
3707 let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
3708 if !predicate.is_none_or(|predicate| {
3709 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3710 }) {
3711 return Ok(());
3712 }
3713 if let Some(idx) = suffix_cache.index(tape, item_idx, suffix_steps) {
3714 fold_json_tape_numeric(
3715 json_tape_scalar(tape, idx),
3716 op,
3717 &mut acc_i,
3718 &mut acc_f,
3719 &mut floated,
3720 &mut min_f,
3721 &mut max_f,
3722 &mut n_obs,
3723 );
3724 }
3725 Ok(())
3726 });
3727
3728 crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs)
3729}
3730#[allow(clippy::too_many_arguments)]
3731fn fold_json_tape_numeric(
3732 value: crate::util::JsonView<'_>,
3733 op: crate::exec::pipeline::NumOp,
3734 acc_i: &mut i64,
3735 acc_f: &mut f64,
3736 floated: &mut bool,
3737 min_f: &mut f64,
3738 max_f: &mut f64,
3739 n_obs: &mut usize,
3740) {
3741 match value {
3742 crate::util::JsonView::Int(value) => crate::exec::pipeline::num_fold_i64(
3743 acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3744 ),
3745 crate::util::JsonView::UInt(value) if value <= i64::MAX as u64 => {
3746 crate::exec::pipeline::num_fold_i64(
3747 acc_i,
3748 acc_f,
3749 floated,
3750 min_f,
3751 max_f,
3752 n_obs,
3753 op,
3754 value as i64,
3755 )
3756 }
3757 crate::util::JsonView::UInt(value) => crate::exec::pipeline::num_fold_f64(
3758 acc_i,
3759 acc_f,
3760 floated,
3761 min_f,
3762 max_f,
3763 n_obs,
3764 op,
3765 value as f64,
3766 ),
3767 crate::util::JsonView::Float(value) => crate::exec::pipeline::num_fold_f64(
3768 acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3769 ),
3770 _ => {}
3771 }
3772}
3773fn eval_json_tape_item_predicate_cached<T: JsonTape>(
3774 tape: &T,
3775 item_idx: usize,
3776 predicate: &NdjsonDirectItemPredicate,
3777 cache: &mut NdjsonPathCache,
3778) -> bool {
3779 use crate::parse::ast::BinOp;
3780
3781 match predicate {
3782 NdjsonDirectItemPredicate::Path(steps) => cache
3783 .index(tape, item_idx, steps)
3784 .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
3785 .unwrap_or(false),
3786 NdjsonDirectItemPredicate::Literal(value) => crate::util::is_truthy(value),
3787 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
3788 eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3789 && eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3790 }
3791 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
3792 eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3793 || eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3794 }
3795 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } => {
3796 let Some(lhs) = eval_json_tape_item_scalar_cached(tape, item_idx, lhs, cache) else {
3797 return false;
3798 };
3799 let Some(rhs) = eval_json_tape_item_scalar_cached(tape, item_idx, rhs, cache) else {
3800 return false;
3801 };
3802 crate::util::json_cmp_binop(lhs, *op, rhs)
3803 }
3804 NdjsonDirectItemPredicate::CmpLit { lhs, op, lit } => cache
3805 .index(tape, item_idx, lhs)
3806 .map(|idx| json_tape_scalar(tape, idx))
3807 .is_some_and(|value| {
3808 crate::util::json_cmp_binop(value, *op, crate::util::JsonView::from_val(lit))
3809 }),
3810 NdjsonDirectItemPredicate::ViewScalarCall { suffix_steps, call } => cache
3811 .index(tape, item_idx, suffix_steps)
3812 .map(|idx| json_tape_scalar(tape, idx))
3813 .and_then(|value| call.try_apply_json_view(value))
3814 .is_some_and(|value| crate::util::is_truthy(&value)),
3815 }
3816}
3817fn eval_json_tape_item_scalar_cached<'a, T: JsonTape>(
3818 tape: &'a T,
3819 item_idx: usize,
3820 predicate: &'a NdjsonDirectItemPredicate,
3821 cache: &mut NdjsonPathCache,
3822) -> Option<crate::util::JsonView<'a>> {
3823 match predicate {
3824 NdjsonDirectItemPredicate::Path(steps) => cache
3825 .index(tape, item_idx, steps)
3826 .map(|idx| json_tape_scalar(tape, idx)),
3827 NdjsonDirectItemPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
3828 _ => None,
3829 }
3830}
3831
3832fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3833where
3834 W: Write,
3835 I: IntoIterator<Item = &'a Val>,
3836{
3837 writer.write_all(b"[")?;
3838 let mut first = true;
3839 for item in items {
3840 if first {
3841 first = false;
3842 } else {
3843 writer.write_all(b",")?;
3844 }
3845 write_val_json(writer, item)?;
3846 }
3847 writer.write_all(b"]")?;
3848 Ok(())
3849}
3850
3851fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3852where
3853 W: Write,
3854 I: IntoIterator<Item = i64>,
3855{
3856 writer.write_all(b"[")?;
3857 let mut first = true;
3858 let mut buf = itoa::Buffer::new();
3859 for item in items {
3860 if first {
3861 first = false;
3862 } else {
3863 writer.write_all(b",")?;
3864 }
3865 writer.write_all(buf.format(item).as_bytes())?;
3866 }
3867 writer.write_all(b"]")?;
3868 Ok(())
3869}
3870
3871fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3872where
3873 W: Write,
3874 I: IntoIterator<Item = f64>,
3875{
3876 writer.write_all(b"[")?;
3877 let mut first = true;
3878 let mut buf = ryu::Buffer::new();
3879 for item in items {
3880 if first {
3881 first = false;
3882 } else {
3883 writer.write_all(b",")?;
3884 }
3885 if item.is_finite() {
3886 writer.write_all(buf.format(item).as_bytes())?;
3887 } else {
3888 writer.write_all(b"0")?;
3889 }
3890 }
3891 writer.write_all(b"]")?;
3892 Ok(())
3893}
3894
3895fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3896where
3897 W: Write,
3898 I: IntoIterator<Item = &'a str>,
3899{
3900 writer.write_all(b"[")?;
3901 let mut first = true;
3902 for item in items {
3903 if first {
3904 first = false;
3905 } else {
3906 writer.write_all(b",")?;
3907 }
3908 write_json_str(writer, item)?;
3909 }
3910 writer.write_all(b"]")?;
3911 Ok(())
3912}
3913
3914fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
3915where
3916 W: Write,
3917 I: IntoIterator<Item = (&'a str, &'a Val)>,
3918{
3919 writer.write_all(b"{")?;
3920 let mut first = true;
3921 for (key, value) in entries {
3922 if first {
3923 first = false;
3924 } else {
3925 writer.write_all(b",")?;
3926 }
3927 write_json_str(writer, key)?;
3928 writer.write_all(b":")?;
3929 write_val_json(writer, value)?;
3930 }
3931 writer.write_all(b"}")?;
3932 Ok(())
3933}
3934
3935fn write_json_objvec<W: Write>(
3936 writer: &mut W,
3937 data: &crate::data::value::ObjVecData,
3938) -> Result<(), JetroEngineError> {
3939 writer.write_all(b"[")?;
3940 for row in 0..data.nrows() {
3941 if row > 0 {
3942 writer.write_all(b",")?;
3943 }
3944 writer.write_all(b"{")?;
3945 for slot in 0..data.stride() {
3946 if slot > 0 {
3947 writer.write_all(b",")?;
3948 }
3949 write_json_str(writer, data.keys[slot].as_ref())?;
3950 writer.write_all(b":")?;
3951 write_val_json(writer, data.cell(row, slot))?;
3952 }
3953 writer.write_all(b"}")?;
3954 }
3955 writer.write_all(b"]")?;
3956 Ok(())
3957}
3958
3959pub(super) fn write_json_str<W: Write>(
3960 writer: &mut W,
3961 value: &str,
3962) -> Result<(), JetroEngineError> {
3963 writer.write_all(b"\"")?;
3964 let bytes = value.as_bytes();
3965 if !needs_json_escape(bytes) {
3966 writer.write_all(bytes)?;
3967 writer.write_all(b"\"")?;
3968 return Ok(());
3969 }
3970
3971 let mut start = 0usize;
3972
3973 for (idx, &byte) in bytes.iter().enumerate() {
3974 let escaped = match byte {
3975 b'"' => Some(br#"\""#.as_slice()),
3976 b'\\' => Some(br#"\\"#.as_slice()),
3977 b'\n' => Some(br#"\n"#.as_slice()),
3978 b'\r' => Some(br#"\r"#.as_slice()),
3979 b'\t' => Some(br#"\t"#.as_slice()),
3980 0x08 => Some(br#"\b"#.as_slice()),
3981 0x0c => Some(br#"\f"#.as_slice()),
3982 0x00..=0x1f => None,
3983 _ => continue,
3984 };
3985
3986 if start < idx {
3987 writer.write_all(&bytes[start..idx])?;
3988 }
3989 match escaped {
3990 Some(seq) => writer.write_all(seq)?,
3991 None => write_control_escape(writer, byte)?,
3992 }
3993 start = idx + 1;
3994 }
3995
3996 if start < bytes.len() {
3997 writer.write_all(&bytes[start..])?;
3998 }
3999 writer.write_all(b"\"")?;
4000 Ok(())
4001}
4002
4003#[inline]
4004pub(super) fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
4005 let mut buf = itoa::Buffer::new();
4006 writer.write_all(buf.format(value).as_bytes())?;
4007 Ok(())
4008}
4009
4010#[inline]
4011fn write_u64<W: Write>(writer: &mut W, value: u64) -> Result<(), JetroEngineError> {
4012 let mut buf = itoa::Buffer::new();
4013 writer.write_all(buf.format(value).as_bytes())?;
4014 Ok(())
4015}
4016
4017#[inline]
4018fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
4019 if value.is_finite() {
4020 let mut buf = ryu::Buffer::new();
4021 writer.write_all(buf.format(value).as_bytes())?;
4022 } else {
4023 writer.write_all(b"0")?;
4024 }
4025 Ok(())
4026}
4027
4028#[inline]
4029fn needs_json_escape(bytes: &[u8]) -> bool {
4030 bytes
4031 .iter()
4032 .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
4033}
4034
4035fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
4036 const HEX: &[u8; 16] = b"0123456789abcdef";
4037 writer.write_all(&[
4038 b'\\',
4039 b'u',
4040 b'0',
4041 b'0',
4042 HEX[(byte >> 4) as usize],
4043 HEX[(byte & 0x0f) as usize],
4044 ])?;
4045 Ok(())
4046}
4047
4048pub(super) fn trim_line_ending(buf: &mut Vec<u8>) {
4049 while matches!(buf.last(), Some(b'\n' | b'\r')) {
4050 buf.pop();
4051 }
4052}
4053
4054pub(super) fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
4055 if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
4056 buf.drain(..3);
4057 }
4058}
4059
4060pub(super) fn non_ws_range(buf: &[u8]) -> (usize, usize) {
4061 let start = buf
4062 .iter()
4063 .position(|b| !b.is_ascii_whitespace())
4064 .unwrap_or(buf.len());
4065 let end = buf
4066 .iter()
4067 .rposition(|b| !b.is_ascii_whitespace())
4068 .map(|idx| idx + 1)
4069 .unwrap_or(start);
4070 (start, end)
4071}
4072
4073#[cfg(test)]
4074mod tests {
4075 #[test]
4076 fn rows_stream_driver_reports_direct_stage_stats() {
4077 let engine = crate::JetroEngine::new();
4078 let plan = super::super::ndjson_rows::ndjson_rows_stream_plan(
4079 "$.rows().filter($.active == true).distinct_by($.id).take(2).map($.id)",
4080 )
4081 .unwrap()
4082 .unwrap();
4083 let input = std::io::Cursor::new(
4084 br#"{"id":"a","active":false}
4085{"id":"a","active":true}
4086{"id":"b","active":true}
4087not-json
4088"#
4089 .to_vec(),
4090 );
4091 let mut out = Vec::new();
4092
4093 let (emitted, stats) = super::drive_ndjson_rows_stream_reader_with_stats(
4094 &engine,
4095 input,
4096 &plan,
4097 None,
4098 super::NdjsonOptions::default(),
4099 &mut out,
4100 )
4101 .unwrap();
4102
4103 assert_eq!(emitted, 2);
4104 assert_eq!(String::from_utf8(out).unwrap(), "\"a\"\n\"b\"\n");
4105 assert_eq!(stats.source, super::RowStreamSourceKind::NdjsonRows);
4106 assert_eq!(stats.direction, super::RowStreamDirection::Forward);
4107 assert_eq!(stats.rows_scanned, 3);
4108 assert_eq!(stats.rows_filtered, 1);
4109 assert_eq!(stats.rows_emitted, 2);
4110 assert_eq!(stats.direct_filter_rows, 3);
4111 assert_eq!(stats.direct_key_rows, 2);
4112 assert_eq!(stats.direct_project_rows, 2);
4113 }
4114
4115 #[test]
4116 fn rows_stream_driver_filters_array_find_on_byte_predicate() {
4117 let engine = crate::JetroEngine::new();
4118 let plan = super::super::ndjson_rows::ndjson_rows_stream_plan(
4119 r#"$.rows().filter(@.custom_attributes.find(@.value == "z")).map($.id)"#,
4120 )
4121 .unwrap()
4122 .unwrap();
4123 let input = std::io::Cursor::new(
4124 br#"{"id":"a","custom_attributes":[{"value":"x"}]}
4125{"id":"b","custom_attributes":[{"value":"z"}]}
4126{"id":"c","custom_attributes":[{"value":null}]}
4127"#
4128 .to_vec(),
4129 );
4130 let mut out = Vec::new();
4131
4132 let (emitted, stats) = super::drive_ndjson_rows_stream_reader_with_stats(
4133 &engine,
4134 input,
4135 &plan,
4136 None,
4137 super::NdjsonOptions::default(),
4138 &mut out,
4139 )
4140 .unwrap();
4141
4142 assert_eq!(emitted, 1);
4143 assert_eq!(String::from_utf8(out).unwrap(), "\"b\"\n");
4144 assert_eq!(stats.rows_scanned, 3);
4145 assert_eq!(stats.rows_filtered, 2);
4146 assert_eq!(stats.direct_filter_rows, 3);
4147 assert_eq!(stats.fallback_filter_rows, 0);
4148 }
4149
4150 #[test]
4151 fn parse_row_keeps_simd_document_lazy() {
4152 let engine = crate::JetroEngine::new();
4153 let row = br#"{"name":"Ada","age":30}"#.to_vec();
4154
4155 let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
4156
4157 assert!(!document.root_val_is_materialized());
4158 assert!(!document.tape_is_built());
4159 }
4160
4161 #[test]
4162 fn owned_row_read_preserves_reusable_buffer_capacity() {
4163 let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
4164 let mut driver = super::NdjsonPerRowDriver::new(input);
4165 let mut buf = Vec::with_capacity(128);
4166
4167 let first = driver
4168 .read_next_owned(&mut buf)
4169 .expect("row read succeeds")
4170 .expect("first row exists");
4171 assert_eq!(first.1, br#"{"n":1}"#);
4172 assert_eq!(buf.capacity(), 128);
4173
4174 let second = driver
4175 .read_next_owned(&mut buf)
4176 .expect("row read succeeds")
4177 .expect("second row exists");
4178 assert_eq!(second.1, br#"{"n":2}"#);
4179 assert_eq!(buf.capacity(), 128);
4180 }
4181
4182 #[test]
4183 fn direct_tape_plan_accepts_first_suffix() {
4184 let engine = crate::JetroEngine::new();
4185 for query in [
4186 "attributes.first().value",
4187 "attributes.last().value",
4188 "attributes.nth(1).value",
4189 ] {
4190 let plan =
4191 super::direct_tape_plan(&engine, query).expect("array suffix should be direct");
4192 assert!(matches!(
4193 plan,
4194 super::NdjsonDirectTapePlan::ArrayElementPath { .. }
4195 ));
4196 }
4197 }
4198
4199 #[test]
4200 fn direct_tape_plan_accepts_rooted_bench_shapes() {
4201 let engine = crate::JetroEngine::new();
4202 for query in [
4203 "$.id",
4204 "$.a.b.c",
4205 "$.meta.id",
4206 "$.name",
4207 "$.attributes.len()",
4208 "$.store.attributes.len()",
4209 "$.attributes.map(@.key)",
4210 "$.attributes.first().value",
4211 "$.store.attributes.first().value",
4212 "$.attributes.last().value",
4213 "$.name.upper()",
4214 "$.store.name.upper()",
4215 "$.attributes.map([@.key, @.value])",
4216 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4217 "$.keys()",
4218 ] {
4219 super::direct_tape_plan(&engine, query)
4220 .unwrap_or_else(|| panic!("{query} should have a direct NDJSON tape plan"));
4221 }
4222 }
4223
4224 #[test]
4225 fn direct_writer_plan_kind_exposes_hot_path_selection() {
4226 let engine = crate::JetroEngine::new();
4227 use super::NdjsonDirectPlanKind::{
4228 ByteExpr, TapeArrayProjection, TapeObjectProjection, TapeRootPath, TapeStreamCollect,
4229 TapeStreamCount, TapeStreamExtreme, TapeStreamFirst, TapeStreamLast, TapeStreamNumeric,
4230 };
4231
4232 for (query, expected) in [
4233 ("$.name", (Some(ByteExpr), TapeRootPath)),
4234 ("$.a.b.c", (Some(ByteExpr), TapeRootPath)),
4235 (r#"{test: $.a.b.c, b: $.a.b}"#, (None, TapeObjectProjection)),
4236 (r#"[$.id, $.name]"#, (None, TapeArrayProjection)),
4237 ("$.attributes.map(@.key)", (None, TapeStreamCollect)),
4238 (
4239 "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4240 (None, TapeStreamCollect),
4241 ),
4242 (
4243 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4244 (None, TapeStreamFirst),
4245 ),
4246 ("$.attributes.map(@.value).last()", (None, TapeStreamLast)),
4247 (
4248 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4249 (None, TapeStreamCount),
4250 ),
4251 (
4252 "$.attributes.map(@.weight).sum()",
4253 (None, TapeStreamNumeric),
4254 ),
4255 (
4256 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4257 (None, TapeStreamNumeric),
4258 ),
4259 (
4260 "$.attributes.sort_by(@.value).last().key",
4261 (None, TapeStreamExtreme),
4262 ),
4263 ] {
4264 let actual = super::direct_writer_plan_kind(&engine, query)
4265 .unwrap_or_else(|| panic!("{query} should have an observable direct plan"));
4266 assert_eq!(actual, expected, "{query}");
4267 }
4268 }
4269
4270 #[test]
4271 fn direct_writer_path_kind_matches_runtime_writer_family() {
4272 let engine = crate::JetroEngine::new();
4273 use super::NdjsonWriterPathKind::{ByteExpr, ByteWritableTape};
4274
4275 for (query, expected) in [
4276 ("$.name", ByteExpr),
4277 ("$.a.b.c", ByteExpr),
4278 (r#"{test: $.a.b.c, b: $.a.b}"#, ByteWritableTape),
4279 (r#"[$.id, $.name]"#, ByteWritableTape),
4280 ("$.attributes.map(@.key)", ByteWritableTape),
4281 (
4282 "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4283 ByteWritableTape,
4284 ),
4285 (
4286 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4287 ByteWritableTape,
4288 ),
4289 ("$.attributes.map(@.weight).sum()", ByteWritableTape),
4290 (
4291 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4292 ByteWritableTape,
4293 ),
4294 (
4295 r#"{id: $.id, name: $.name, count: $.attributes.len()}"#,
4296 ByteWritableTape,
4297 ),
4298 (
4299 r#"[$.id, $.name, $.attributes.first().value, $.attributes.last().value]"#,
4300 ByteWritableTape,
4301 ),
4302 (
4303 r#"{name_upper: $.name.upper(), values: $.attributes.map(@.value), last: $.attributes.last().value}"#,
4304 ByteWritableTape,
4305 ),
4306 ("$.attributes.sort_by(@.value).last().key", ByteWritableTape),
4307 ] {
4308 assert_eq!(
4309 super::direct_writer_path_kind(&engine, query),
4310 Some(expected),
4311 "{query}"
4312 );
4313 }
4314 }
4315
4316 #[test]
4317 fn public_writer_path_kind_reports_direct_family() {
4318 let engine = crate::JetroEngine::new();
4319 let kind = crate::io::ndjson_writer_path_kind(&engine, "$.name").unwrap();
4320 assert_eq!(kind, super::NdjsonWriterPathKind::ByteExpr);
4321 assert_eq!(kind.to_string(), "byte-expr");
4322 }
4323
4324 #[test]
4325 fn direct_tape_plan_lowers_stream_shapes_generically() {
4326 let engine = crate::JetroEngine::new();
4327 for query in [
4328 "$.attributes.map(@.key)",
4329 "$.attributes.map(@.key.upper())",
4330 "$.attributes.map(@.value).first()",
4331 "$.attributes.map(@.value).last()",
4332 r#"$.attributes.filter(@.value.contains("_3")).map(@.key)"#,
4333 r#"$.attributes.filter(@.value.contains("_3")).map(@.key.upper())"#,
4334 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4335 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4336 "$.attributes.map(@.weight).sum()",
4337 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4338 "$.attributes.sort_by(@.value).last().key",
4339 ] {
4340 let plan =
4341 super::direct_tape_plan(&engine, query).expect("query should be direct NDJSON");
4342 assert!(
4343 matches!(plan, super::NdjsonDirectTapePlan::Stream(_)),
4344 "{query} should lower to a generic NDJSON stream plan"
4345 );
4346 }
4347 }
4348
4349 #[test]
4350 fn direct_byte_plan_accepts_fast_root_shapes() {
4351 let engine = crate::JetroEngine::new();
4352 for query in [
4353 "$.id",
4354 "$.name",
4355 "$.name.upper()",
4356 "$.name.lower()",
4357 "$.keys()",
4358 "$.meta.keys()",
4359 "$.values()",
4360 "$.entries()",
4361 "$.attributes.first().value",
4362 "$.store.attributes.first().value",
4363 "$.attributes.first().key.upper()",
4364 "$.attributes.last().value",
4365 "$.attributes.nth(1).value",
4366 ] {
4367 super::direct_byte_plan(&engine, query)
4368 .unwrap_or_else(|| panic!("{query} should have a direct NDJSON byte plan"));
4369 }
4370 }
4371
4372 #[test]
4373 fn direct_byte_predicates_cover_match_shapes() {
4374 let engine = crate::JetroEngine::new();
4375 let row = br#"{"active":true,"score":9910,"attributes":[{"key":"k1","value":"v_1"}]}"#;
4376 for predicate in [
4377 ("active", true),
4378 ("score > 9900", true),
4379 ("score < 100", false),
4380 (r#"attributes.first().value.contains("_1")"#, true),
4381 ] {
4382 let plan = super::direct_tape_predicate(&engine, predicate.0)
4383 .unwrap_or_else(|| panic!("{} should have a direct predicate", predicate.0));
4384 let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
4385 .expect("byte predicate should evaluate")
4386 .unwrap_or_else(|| panic!("{} should not need tape fallback", predicate.0));
4387 assert_eq!(matched, predicate.1, "{}", predicate.0);
4388 }
4389 }
4390
4391 #[test]
4392 fn direct_byte_predicate_covers_array_find_field_comparison() {
4393 let row = br#"{"custom_attributes":[{"attribute_name":"a","value":"x"},{"attribute_name":"b","value":"z"},{"attribute_name":"c","value":null},{"attribute_name":"d","value":""}]}"#;
4394 for (predicate, expected) in [
4395 (r#"@.custom_attributes.find(@.value == "z")"#, true),
4396 (r#"@.custom_attributes.find(value == "missing")"#, false),
4397 (r#"@.custom_attributes.find(@.value == null)"#, true),
4398 (r#"@.custom_attributes.find(@.value == "")"#, true),
4399 ] {
4400 let expr = crate::parse::parser::parse(predicate).expect("parse");
4401 let plan = super::super::ndjson_direct::direct_tape_predicate_for_expr(&expr)
4402 .unwrap_or_else(|| panic!("{predicate} should have a direct predicate"));
4403 let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
4404 .expect("byte predicate should evaluate")
4405 .unwrap_or_else(|| panic!("{predicate} should not need tape fallback"));
4406 assert_eq!(matched, expected, "{predicate}");
4407 }
4408 }
4409
4410 #[test]
4411 fn direct_byte_tape_plan_counts_filtered_rows() {
4412 let engine = crate::JetroEngine::new();
4413 let query = r#"attributes.filter(@.value.contains("_3")).len()"#;
4414 let plan = super::direct_tape_plan(&engine, query).expect("filter count should be direct");
4415 assert!(super::tape_plan_can_write_byte_row(&plan));
4416
4417 let row = br#"{"attributes":[{"value":"a_3"},{"value":"b"},{"value":"c_3"}]}"#;
4418 let mut out = Vec::new();
4419 let mut scratch = Vec::new();
4420 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4421 .expect("byte count should write");
4422 assert!(matches!(wrote, super::BytePlanWrite::Done));
4423 assert_eq!(out, b"2");
4424 }
4425
4426 #[test]
4427 fn direct_byte_tape_plan_reduces_numeric_streams() {
4428 let engine = crate::JetroEngine::new();
4429 let row = br#"{"attributes":[{"weight":1},{"weight":2.5},{"weight":3},{"weight":"skip"}]}"#;
4430 for (query, expected) in [
4431 ("$.attributes.map(@.weight).sum()", "6.5"),
4432 ("$.attributes.map(@.weight).avg()", "2.1666666666666665"),
4433 ("$.attributes.map(@.weight).min()", "1.0"),
4434 ("$.attributes.map(@.weight).max()", "3.0"),
4435 ] {
4436 let plan = super::direct_tape_plan(&engine, query)
4437 .unwrap_or_else(|| panic!("{query} should be direct"));
4438 assert!(
4439 super::tape_plan_can_write_byte_row(&plan),
4440 "{query} should be byte-writable"
4441 );
4442 let mut out = Vec::new();
4443 let mut scratch = Vec::new();
4444 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4445 .expect("byte numeric stream should write");
4446 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4447 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4448 }
4449 }
4450
4451 #[test]
4452 fn direct_byte_tape_plan_collects_stream_maps() {
4453 let engine = crate::JetroEngine::new();
4454 let row = br#"{"attributes":[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]}"#;
4455 for (query, expected) in [
4456 ("attributes.map(@.key)", r#"["k1","k2"]"#),
4457 (
4458 "attributes.map([@.key, @.value])",
4459 r#"[["k1","v1"],["k2","v2"]]"#,
4460 ),
4461 (
4462 "attributes.map({key: @.key, value: @.value})",
4463 r#"[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]"#,
4464 ),
4465 ("attributes.map(@.key.upper())", r#"["K1","K2"]"#),
4466 (
4467 r#"attributes.filter(@.value.contains("2")).map(@.key)"#,
4468 r#"["k2"]"#,
4469 ),
4470 (
4471 r#"attributes.filter(@.value.contains("2")).map({key: @.key, value: @.value})"#,
4472 r#"[{"key":"k2","value":"v2"}]"#,
4473 ),
4474 (
4475 r#"attributes.filter(@.key != "k1").map([@.key, @.value])"#,
4476 r#"[["k2","v2"]]"#,
4477 ),
4478 ] {
4479 let plan = super::direct_tape_plan(&engine, query)
4480 .unwrap_or_else(|| panic!("{query} should be direct"));
4481 assert!(
4482 super::tape_plan_can_write_byte_row(&plan),
4483 "{query} should be byte-writable"
4484 );
4485 let mut out = Vec::new();
4486 let mut scratch = Vec::new();
4487 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4488 .expect("byte stream should write");
4489 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4490 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4491 }
4492 }
4493
4494 #[test]
4495 fn direct_byte_tape_plan_writes_static_projections() {
4496 let engine = crate::JetroEngine::new();
4497 let row = br#"{"id":7,"a":{"b":{"c":1}}}"#;
4498 for (query, expected) in [
4499 ("$.a.b.c", "1"),
4500 (r#"{test: $.a.b.c, b: $.a.b}"#, r#"{"test":1,"b":{"c":1}}"#),
4501 (r#"[$.a.b.c, $.id]"#, r#"[1,7]"#),
4502 ] {
4503 let plan = super::direct_tape_plan(&engine, query)
4504 .unwrap_or_else(|| panic!("{query} should be direct"));
4505 assert!(
4506 super::tape_plan_can_write_byte_row(&plan),
4507 "{query} should be byte-writable"
4508 );
4509 let mut out = Vec::new();
4510 let mut scratch = Vec::new();
4511 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4512 .expect("byte projection should write");
4513 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4514 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4515 }
4516 }
4517
4518 #[test]
4519 fn run_ndjson_uses_byte_paths_for_nested_object_items() {
4520 let engine = crate::JetroEngine::new();
4521 let rows = std::io::Cursor::new(
4522 br#"{"id":1}
4523{"id":2}
4524"#,
4525 );
4526 let mut out = Vec::new();
4527 engine
4528 .run_ndjson(rows, "$.id", &mut out)
4529 .expect("rooted byte path should run");
4530 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4531
4532 let rows = std::io::Cursor::new(
4533 br#"{"meta":{"id":1,"kind":"a"}}
4534{"meta":{"id":2,"kind":"b"}}
4535"#,
4536 );
4537
4538 let mut out = Vec::new();
4539 engine
4540 .run_ndjson(rows, "$.meta.id", &mut out)
4541 .expect("nested byte path should run");
4542 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4543
4544 let rows = std::io::Cursor::new(br#"{"meta":{"id":1,"kind":"a"}}"#);
4545 let mut out = Vec::new();
4546 engine
4547 .run_ndjson(rows, "$.meta.keys()", &mut out)
4548 .expect("nested byte object items should run");
4549 assert_eq!(std::str::from_utf8(&out).unwrap(), "[\"id\",\"kind\"]\n");
4550 }
4551
4552 #[test]
4553 fn run_ndjson_uses_byte_paths_for_nested_array_demands() {
4554 let engine = crate::JetroEngine::new();
4555 let rows = std::io::Cursor::new(
4556 br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4557{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4558"#,
4559 );
4560
4561 let mut out = Vec::new();
4562 engine
4563 .run_ndjson(rows, "$.store.attributes.first().value", &mut out)
4564 .expect("nested byte array demand should run");
4565 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"a\"\n\"c\"\n");
4566
4567 out.clear();
4568 let rows = std::io::Cursor::new(
4569 br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4570{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4571"#,
4572 );
4573 engine
4574 .run_ndjson(rows, "$.store.attributes.last().value", &mut out)
4575 .expect("nested byte last demand should run from field prefix");
4576 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"d\"\n");
4577 }
4578
4579 #[test]
4580 fn run_ndjson_static_projection_survives_hint_activation() {
4581 let engine = crate::JetroEngine::new();
4582 let rows = std::io::Cursor::new(
4583 br#"{"id":1,"name":"a","active":true}
4584{"id":2,"name":"b","active":true}
4585{"id":3,"name":"c","active":true}
4586{"id":4,"name":"d","active":true}
4587{"id":5,"name":"e","active":true}
4588{"id":6,"name":"f","active":true}
4589{"id":7,"name":"g","active":true}
4590{"id":8,"name":"h","active":true}
4591{"id":9,"name":"i","active":true}
4592"#,
4593 );
4594 let mut out = Vec::new();
4595 engine
4596 .run_ndjson(rows, r#"{id: $.id, name: $.name}"#, &mut out)
4597 .expect("hinted static projection should run");
4598 assert_eq!(
4599 std::str::from_utf8(&out).unwrap(),
4600 "{\"id\":1,\"name\":\"a\"}\n\
4601{\"id\":2,\"name\":\"b\"}\n\
4602{\"id\":3,\"name\":\"c\"}\n\
4603{\"id\":4,\"name\":\"d\"}\n\
4604{\"id\":5,\"name\":\"e\"}\n\
4605{\"id\":6,\"name\":\"f\"}\n\
4606{\"id\":7,\"name\":\"g\"}\n\
4607{\"id\":8,\"name\":\"h\"}\n\
4608{\"id\":9,\"name\":\"i\"}\n"
4609 );
4610 }
4611
4612 #[test]
4613 fn run_ndjson_nested_projection_survives_hint_activation() {
4614 let engine = crate::JetroEngine::new();
4615 let rows = std::io::Cursor::new(
4616 br#"{"id":1,"profile":{"name":"a","score":10},"active":true}
4617{"id":2,"profile":{"name":"b","score":20},"active":true}
4618{"id":3,"profile":{"name":"c","score":30},"active":true}
4619{"id":4,"profile":{"name":"d","score":40},"active":true}
4620{"id":5,"profile":{"name":"e","score":50},"active":true}
4621{"id":6,"profile":{"name":"f","score":60},"active":true}
4622{"id":7,"profile":{"name":"g","score":70},"active":true}
4623{"id":8,"profile":{"name":"h","score":80},"active":true}
4624{"id":9,"profile":{"name":"i","score":90},"active":true}
4625"#,
4626 );
4627 let mut out = Vec::new();
4628 engine
4629 .run_ndjson(
4630 rows,
4631 r#"{id: $.id, name: $.profile.name, profile: $.profile}"#,
4632 &mut out,
4633 )
4634 .expect("hinted nested projection should run");
4635 assert_eq!(
4636 std::str::from_utf8(&out).unwrap(),
4637 "{\"id\":1,\"name\":\"a\",\"profile\":{\"name\":\"a\",\"score\":10}}\n\
4638{\"id\":2,\"name\":\"b\",\"profile\":{\"name\":\"b\",\"score\":20}}\n\
4639{\"id\":3,\"name\":\"c\",\"profile\":{\"name\":\"c\",\"score\":30}}\n\
4640{\"id\":4,\"name\":\"d\",\"profile\":{\"name\":\"d\",\"score\":40}}\n\
4641{\"id\":5,\"name\":\"e\",\"profile\":{\"name\":\"e\",\"score\":50}}\n\
4642{\"id\":6,\"name\":\"f\",\"profile\":{\"name\":\"f\",\"score\":60}}\n\
4643{\"id\":7,\"name\":\"g\",\"profile\":{\"name\":\"g\",\"score\":70}}\n\
4644{\"id\":8,\"name\":\"h\",\"profile\":{\"name\":\"h\",\"score\":80}}\n\
4645{\"id\":9,\"name\":\"i\",\"profile\":{\"name\":\"i\",\"score\":90}}\n"
4646 );
4647 }
4648
4649 #[test]
4650 fn run_ndjson_scalar_projection_survives_hint_activation() {
4651 let engine = crate::JetroEngine::new();
4652 let rows = std::io::Cursor::new(
4653 br#"{"id":1,"profile":{"name":"a"}}
4654{"id":2,"profile":{"name":"b"}}
4655{"id":3,"profile":{"name":"c"}}
4656{"id":4,"profile":{"name":"d"}}
4657{"id":5,"profile":{"name":"e"}}
4658{"id":6,"profile":{"name":"f"}}
4659{"id":7,"profile":{"name":"g"}}
4660{"id":8,"profile":{"name":"h"}}
4661{"id":9,"profile":{"name":"i"}}
4662"#,
4663 );
4664 let mut out = Vec::new();
4665 engine
4666 .run_ndjson(
4667 rows,
4668 r#"{id: $.id, name: $.profile.name.upper()}"#,
4669 &mut out,
4670 )
4671 .expect("hinted scalar projection should run");
4672 assert_eq!(
4673 std::str::from_utf8(&out).unwrap(),
4674 "{\"id\":1,\"name\":\"A\"}\n\
4675{\"id\":2,\"name\":\"B\"}\n\
4676{\"id\":3,\"name\":\"C\"}\n\
4677{\"id\":4,\"name\":\"D\"}\n\
4678{\"id\":5,\"name\":\"E\"}\n\
4679{\"id\":6,\"name\":\"F\"}\n\
4680{\"id\":7,\"name\":\"G\"}\n\
4681{\"id\":8,\"name\":\"H\"}\n\
4682{\"id\":9,\"name\":\"I\"}\n"
4683 );
4684 }
4685
4686 #[test]
4687 fn run_ndjson_stream_collect_survives_hint_activation() {
4688 let engine = crate::JetroEngine::new();
4689 let rows = std::io::Cursor::new(
4690 br#"{"id":1,"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4691{"id":2,"attributes":[{"key":"c","value":"z"}]}
4692{"id":3,"attributes":[{"key":"d","value":"w"}]}
4693"#,
4694 );
4695 let mut out = Vec::new();
4696 engine
4697 .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4698 .expect("hinted stream collect should run");
4699 assert_eq!(
4700 std::str::from_utf8(&out).unwrap(),
4701 "[[\"a\",\"x\"],[\"b\",\"y\"]]\n[[\"c\",\"z\"]]\n[[\"d\",\"w\"]]\n"
4702 );
4703 }
4704
4705 #[test]
4706 fn run_ndjson_stream_cache_rejects_reordered_item_prefixes() {
4707 let engine = crate::JetroEngine::new();
4708 let rows = std::io::Cursor::new(
4709 br#"{"attributes":[{"key":"k1","value":"a"}]}
4710{"attributes":[{"value":"k1","key":"actual"}]}
4711"#,
4712 );
4713 let mut out = Vec::new();
4714 engine
4715 .run_ndjson(rows, "$.attributes.map(@.key)", &mut out)
4716 .expect("stream cache should fall back on reordered item fields");
4717 assert_eq!(
4718 std::str::from_utf8(&out).unwrap(),
4719 "[\"k1\"]\n[\"actual\"]\n"
4720 );
4721 }
4722
4723 #[test]
4724 fn run_ndjson_stream_map_preserves_missing_field_nulls() {
4725 let engine = crate::JetroEngine::new();
4726 let rows = std::io::Cursor::new(
4727 br#"{"attributes":[{"key":"a","value":"x"},{"key":"b"}]}
4728{"attributes":[{"value":"z"}]}
4729"#,
4730 );
4731 let mut out = Vec::new();
4732 engine
4733 .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4734 .expect("stream map should preserve nulls for missing fields");
4735 assert_eq!(
4736 std::str::from_utf8(&out).unwrap(),
4737 "[[\"a\",\"x\"],[\"b\",null]]\n[[null,\"z\"]]\n"
4738 );
4739 }
4740
4741 #[test]
4742 fn run_ndjson_stream_object_map_preserves_scalar_calls() {
4743 let engine = crate::JetroEngine::new();
4744 let rows = std::io::Cursor::new(
4745 br#"{"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4746"#,
4747 );
4748 let mut out = Vec::new();
4749 engine
4750 .run_ndjson(
4751 rows,
4752 "$.attributes.map({k: @.key, v: @.value.upper()})",
4753 &mut out,
4754 )
4755 .expect("stream object map should preserve scalar calls");
4756 assert_eq!(
4757 std::str::from_utf8(&out).unwrap(),
4758 "[{\"k\":\"a\",\"v\":\"X\"},{\"k\":\"b\",\"v\":\"Y\"}]\n"
4759 );
4760 }
4761
4762 #[test]
4763 fn run_ndjson_stream_map_projects_nested_item_paths() {
4764 let engine = crate::JetroEngine::new();
4765 let rows = std::io::Cursor::new(
4766 br#"{"attributes":[{"key":"a","meta":{"code":"x"}},{"key":"b","meta":{"code":"y"}}]}
4767"#,
4768 );
4769 let mut out = Vec::new();
4770 engine
4771 .run_ndjson(
4772 rows,
4773 "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4774 &mut out,
4775 )
4776 .expect("stream map should project nested item paths");
4777 assert_eq!(
4778 std::str::from_utf8(&out).unwrap(),
4779 "[{\"k\":\"a\",\"code\":\"X\"},{\"k\":\"b\",\"code\":\"Y\"}]\n"
4780 );
4781 }
4782
4783 #[test]
4784 fn run_ndjson_stream_count_survives_hint_activation() {
4785 let engine = crate::JetroEngine::new();
4786 let rows = std::io::Cursor::new(
4787 br#"{"id":1,"attributes":[{"value":"x_3"},{"value":"y"}]}
4788{"id":2,"attributes":[{"value":"z_3"},{"value":"w_3"}]}
4789{"id":3,"attributes":[{"value":"n"}]}
4790"#,
4791 );
4792 let mut out = Vec::new();
4793 engine
4794 .run_ndjson(
4795 rows,
4796 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4797 &mut out,
4798 )
4799 .expect("hinted stream count should run");
4800 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n0\n");
4801 }
4802
4803 #[test]
4804 fn run_ndjson_filtered_count_ignores_missing_predicate_fields() {
4805 let engine = crate::JetroEngine::new();
4806 let rows = std::io::Cursor::new(
4807 br#"{"attributes":[{"value":"x_3"},{"key":"missing"},{"value":"y"}]}
4808{"attributes":[{"key":"missing"}]}
4809"#,
4810 );
4811 let mut out = Vec::new();
4812 engine
4813 .run_ndjson(
4814 rows,
4815 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4816 &mut out,
4817 )
4818 .expect("filtered count should ignore missing predicate fields");
4819 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n0\n");
4820 }
4821
4822 #[test]
4823 fn run_ndjson_filter_last_returns_last_matching_output() {
4824 let engine = crate::JetroEngine::new();
4825 let rows = std::io::Cursor::new(
4826 br#"{"attributes":[{"key":"keep","value":"first"},{"key":"drop","value":"physical-last"}]}
4827{"attributes":[{"key":"drop","value":"first"},{"key":"keep","value":"semantic-last"},{"key":"drop","value":"physical-last"}]}
4828"#,
4829 );
4830 let mut out = Vec::new();
4831
4832 engine
4833 .run_ndjson(
4834 rows,
4835 r#"$.attributes.filter(@.key == "keep").last().value"#,
4836 &mut out,
4837 )
4838 .expect("filtered last should preserve semantic output order");
4839
4840 assert_eq!(
4841 std::str::from_utf8(&out).unwrap(),
4842 "\"first\"\n\"semantic-last\"\n"
4843 );
4844 }
4845
4846 #[test]
4847 fn run_ndjson_filter_map_first_stops_at_first_matching_output() {
4848 let engine = crate::JetroEngine::new();
4849 let rows = std::io::Cursor::new(
4850 br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4851{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4852{"attributes":[{"key":"a","value":"skip"}]}
4853"#,
4854 );
4855 let mut out = Vec::new();
4856
4857 engine
4858 .run_ndjson(
4859 rows,
4860 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4861 &mut out,
4862 )
4863 .expect("filtered first should use direct stream first");
4864
4865 assert_eq!(
4866 std::str::from_utf8(&out).unwrap(),
4867 "{\"key\":\"a\",\"value\":\"x_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4868 );
4869 }
4870
4871 #[test]
4872 fn run_ndjson_map_first_projects_first_item_without_filter() {
4873 let engine = crate::JetroEngine::new();
4874 let rows = std::io::Cursor::new(
4875 br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"later"}]}
4876{"attributes":[]}
4877{"attributes":[{"key":"c","value":"only"}]}
4878"#,
4879 );
4880 let mut out = Vec::new();
4881
4882 engine
4883 .run_ndjson(rows, "$.attributes.map(@.value).first()", &mut out)
4884 .expect("unfiltered first should use direct stream first");
4885
4886 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"first\"\n\"only\"\n");
4887 }
4888
4889 #[test]
4890 fn run_ndjson_map_last_projects_last_item_without_filter() {
4891 let engine = crate::JetroEngine::new();
4892 let rows = std::io::Cursor::new(
4893 br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"last"}]}
4894{"attributes":[]}
4895{"attributes":[{"key":"c","value":"only"}]}
4896"#,
4897 );
4898 let mut out = Vec::new();
4899
4900 engine
4901 .run_ndjson(rows, "$.attributes.map(@.value).last()", &mut out)
4902 .expect("unfiltered last should use direct stream last");
4903
4904 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"last\"\n\"only\"\n");
4905 }
4906
4907 #[test]
4908 fn run_ndjson_filter_map_last_keeps_latest_matching_output() {
4909 let engine = crate::JetroEngine::new();
4910 let rows = std::io::Cursor::new(
4911 br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4912{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4913{"attributes":[{"key":"a","value":"skip"}]}
4914"#,
4915 );
4916 let mut out = Vec::new();
4917
4918 engine
4919 .run_ndjson(
4920 rows,
4921 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).last()"#,
4922 &mut out,
4923 )
4924 .expect("filtered last should use direct stream last");
4925
4926 assert_eq!(
4927 std::str::from_utf8(&out).unwrap(),
4928 "{\"key\":\"b\",\"value\":\"later_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4929 );
4930 }
4931
4932 #[test]
4933 fn run_ndjson_stream_numeric_survives_hint_activation() {
4934 let engine = crate::JetroEngine::new();
4935 let rows = std::io::Cursor::new(
4936 br#"{"id":1,"attributes":[{"weight":1},{"weight":2}]}
4937{"id":2,"attributes":[{"weight":3.5},{"weight":4}]}
4938{"id":3,"attributes":[{"weight":"skip"}]}
4939"#,
4940 );
4941 let mut out = Vec::new();
4942 engine
4943 .run_ndjson(rows, "$.attributes.map(@.weight).sum()", &mut out)
4944 .expect("hinted numeric stream should run");
4945 assert_eq!(std::str::from_utf8(&out).unwrap(), "3\n7.5\n0\n");
4946 }
4947
4948 #[test]
4949 fn run_ndjson_filtered_stream_numeric_uses_shared_fields() {
4950 let engine = crate::JetroEngine::new();
4951 let rows = std::io::Cursor::new(
4952 br#"{"attributes":[{"value":"x_3","weight":1},{"value":"skip","weight":10},{"value":"y_3","weight":2.5}]}
4953{"attributes":[{"value":"skip","weight":4}]}
4954"#,
4955 );
4956 let mut out = Vec::new();
4957 engine
4958 .run_ndjson(
4959 rows,
4960 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4961 &mut out,
4962 )
4963 .expect("filtered numeric stream should use byte path");
4964 assert_eq!(std::str::from_utf8(&out).unwrap(), "3.5\n0\n");
4965 }
4966
4967 #[test]
4968 fn run_ndjson_stream_extreme_projects_selected_item_field() {
4969 let engine = crate::JetroEngine::new();
4970 let rows = std::io::Cursor::new(
4971 br#"{"attributes":[{"key":"a","value":"m"},{"key":"b","value":"z"},{"key":"c","value":"n"}]}
4972{"attributes":[{"key":"x","value":"b"},{"key":"y","value":"a"}]}
4973"#,
4974 );
4975 let mut out = Vec::new();
4976 engine
4977 .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4978 .expect("stream extreme should project selected item field");
4979 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"x\"\n");
4980 }
4981
4982 #[test]
4983 fn run_ndjson_stream_extreme_handles_escaped_string_keys() {
4984 let engine = crate::JetroEngine::new();
4985 let rows = std::io::Cursor::new(
4986 br#"{"attributes":[{"key":"a","value":"v\"1"},{"key":"b","value":"v_9"}]}
4987"#,
4988 );
4989 let mut out = Vec::new();
4990 engine
4991 .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4992 .expect("escaped extrema keys should fall back safely");
4993 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n");
4994 }
4995
4996 #[test]
4997 fn run_ndjson_stream_extreme_handles_numeric_keys() {
4998 let engine = crate::JetroEngine::new();
4999 let rows = std::io::Cursor::new(
5000 br#"{"attributes":[{"key":"a","score":1},{"key":"b","score":10},{"key":"c","score":2}]}
5001{"attributes":[{"key":"x","score":-2},{"key":"y","score":-1.5}]}
5002"#,
5003 );
5004 let mut out = Vec::new();
5005 engine
5006 .run_ndjson(rows, "$.attributes.sort_by(@.score).last().key", &mut out)
5007 .expect("numeric extrema keys should use direct stream extrema");
5008 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"y\"\n");
5009 }
5010
5011 #[test]
5012 fn run_ndjson_nested_direct_projection_writes_without_fallback() {
5013 let engine = crate::JetroEngine::new();
5014 let rows = std::io::Cursor::new(
5015 br#"{"id":1,"name":"ada","attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
5016{"id":2,"name":"bob","attributes":[{"key":"c","value":"z"}]}
5017"#,
5018 );
5019 let mut out = Vec::new();
5020 engine
5021 .run_ndjson(
5022 rows,
5023 r#"{id: $.id, name: $.name, count: $.attributes.len(), first: $.attributes.first().value, values: $.attributes.map(@.value)}"#,
5024 &mut out,
5025 )
5026 .expect("nested direct projection should run");
5027 assert_eq!(
5028 std::str::from_utf8(&out).unwrap(),
5029 "{\"id\":1,\"name\":\"ada\",\"count\":2,\"first\":\"x\",\"values\":[\"x\",\"y\"]}\n\
5030{\"id\":2,\"name\":\"bob\",\"count\":1,\"first\":\"z\",\"values\":[\"z\"]}\n"
5031 );
5032 }
5033}