1use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
2use super::stream_exec::CompiledRowStream;
3use super::stream_plan::{
4 lower_root_rows_query, RowStreamDirection, RowStreamPlan, RowStreamSourceKind,
5};
6use super::stream_subquery::{lower_single_rows_subquery, RowStreamSubqueryPlan, STREAM_BINDING};
7use super::stream_types::{RowStreamRowResult, RowStreamStats};
8use super::{NdjsonSource, RowError};
9use crate::compile::compiler::Compiler;
10use crate::data::context::Env;
11use crate::data::value::Val;
12use crate::plan::physical::PlanningContext;
13use crate::util::is_truthy;
14use crate::{EvalError, Jetro, JetroEngine, JetroEngineError, VM};
15use memchr::memchr;
16use serde_json::Value;
17use std::fs::File;
18use std::io::{BufRead, BufWriter, Write};
19use std::path::Path;
20use std::sync::MutexGuard;
21
22#[cfg(feature = "simd-json")]
23use super::ndjson_byte::{
24 eval_ndjson_byte_predicate_row, tape_plan_can_write_byte_row, write_ndjson_byte_plan_row,
25 write_ndjson_byte_tape_plan_row, write_ndjson_hinted_tape_plan_row, BytePlanWrite,
26};
27#[cfg(test)]
28#[cfg(feature = "simd-json")]
29pub(super) use super::ndjson_direct::{
30 direct_byte_plan, direct_writer_plan_kind, NdjsonDirectPlanKind,
31};
32#[cfg(feature = "simd-json")]
33pub(super) use super::ndjson_direct::{
34 direct_tape_plan, direct_tape_predicate, direct_writer_plans, NdjsonDirectBytePlan,
35 NdjsonDirectElement, NdjsonDirectItemPredicate, NdjsonDirectPredicate,
36 NdjsonDirectProjectionValue, NdjsonDirectStreamMap, NdjsonDirectStreamPlan,
37 NdjsonDirectStreamSink, NdjsonDirectTapePlan,
38};
39#[cfg(feature = "simd-json")]
40use super::ndjson_hint::{
41 NdjsonHintAccessPlan, NdjsonHintConfig, NdjsonHintDecision, NdjsonHintState,
42};
43#[cfg(feature = "simd-json")]
44use super::ndjson_stream_cache::NdjsonConstantStreamCache;
45
46const DEFAULT_MAX_LINE_LEN: usize = 64 * 1024 * 1024;
47const DEFAULT_LINE_BUFFER_CAPACITY: usize = 8192;
48const DEFAULT_READER_BUFFER_CAPACITY: usize = 1024 * 1024;
49pub(super) const DEFAULT_REVERSE_CHUNK_SIZE: usize = 64 * 1024;
50
51#[cfg(test)]
52#[cfg(feature = "simd-json")]
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub(super) enum NdjsonWriterPathKind {
55 ByteExpr,
56 ByteWritableTape,
57 Tape,
58}
59
60#[cfg(test)]
61#[cfg(feature = "simd-json")]
62pub(super) fn direct_writer_path_kind(
63 engine: &JetroEngine,
64 query: &str,
65) -> Option<NdjsonWriterPathKind> {
66 let (byte, tape) = direct_writer_plans(engine, query)?;
67 if byte.is_some() {
68 return Some(NdjsonWriterPathKind::ByteExpr);
69 }
70 if tape_plan_can_write_byte_row(&tape) {
71 return Some(NdjsonWriterPathKind::ByteWritableTape);
72 }
73 Some(NdjsonWriterPathKind::Tape)
74}
75
76#[derive(Clone, Copy, Debug, PartialEq, Eq)]
78pub struct NdjsonOptions {
79 pub max_line_len: usize,
80 pub initial_buffer_capacity: usize,
81 pub reader_buffer_capacity: usize,
82 pub reverse_chunk_size: usize,
83 pub parallel_min_bytes: u64,
84 pub parallelism: NdjsonParallelism,
85 pub row_frame: NdjsonRowFrame,
86 pub null_output: NdjsonNullOutput,
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum NdjsonParallelism {
91 Auto,
92 Off,
93}
94
95#[derive(Clone, Copy, Debug, Eq, PartialEq)]
96pub enum NdjsonNullOutput {
97 Skip,
98 Emit,
99}
100
101impl Default for NdjsonOptions {
102 fn default() -> Self {
103 Self {
104 max_line_len: DEFAULT_MAX_LINE_LEN,
105 initial_buffer_capacity: DEFAULT_LINE_BUFFER_CAPACITY,
106 reader_buffer_capacity: DEFAULT_READER_BUFFER_CAPACITY,
107 reverse_chunk_size: DEFAULT_REVERSE_CHUNK_SIZE,
108 parallel_min_bytes: 64 * 1024 * 1024,
109 parallelism: NdjsonParallelism::Auto,
110 row_frame: NdjsonRowFrame::JsonLine,
111 null_output: NdjsonNullOutput::Skip,
112 }
113 }
114}
115
116impl NdjsonOptions {
117 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
118 self.max_line_len = max_line_len;
119 self
120 }
121
122 pub fn with_initial_buffer_capacity(mut self, capacity: usize) -> Self {
123 self.initial_buffer_capacity = capacity;
124 self
125 }
126
127 pub fn with_reader_buffer_capacity(mut self, capacity: usize) -> Self {
128 self.reader_buffer_capacity = capacity;
129 self
130 }
131
132 pub fn with_reverse_chunk_size(mut self, capacity: usize) -> Self {
133 self.reverse_chunk_size = capacity;
134 self
135 }
136
137 pub fn with_parallel_min_bytes(mut self, bytes: u64) -> Self {
138 self.parallel_min_bytes = bytes;
139 self
140 }
141
142 pub fn with_parallelism(mut self, parallelism: NdjsonParallelism) -> Self {
143 self.parallelism = parallelism;
144 self
145 }
146
147 pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
148 self.row_frame = row_frame;
149 self
150 }
151
152 pub fn with_null_output(mut self, null_output: NdjsonNullOutput) -> Self {
153 self.null_output = null_output;
154 self
155 }
156}
157
158pub struct NdjsonPerRowDriver<R> {
160 reader: R,
161 line_no: u64,
162 max_line_len: usize,
163 row_frame: NdjsonRowFrame,
164}
165
166impl<R: BufRead> NdjsonPerRowDriver<R> {
167 pub fn new(reader: R) -> Self {
168 Self {
169 reader,
170 line_no: 0,
171 max_line_len: DEFAULT_MAX_LINE_LEN,
172 row_frame: NdjsonRowFrame::JsonLine,
173 }
174 }
175
176 pub fn with_options(mut self, options: NdjsonOptions) -> Self {
177 self.max_line_len = options.max_line_len;
178 self.row_frame = options.row_frame;
179 self
180 }
181
182 pub fn with_max_line_len(mut self, max_line_len: usize) -> Self {
183 self.max_line_len = max_line_len;
184 self
185 }
186
187 pub fn with_row_frame(mut self, row_frame: NdjsonRowFrame) -> Self {
188 self.row_frame = row_frame;
189 self
190 }
191
192 pub fn line_no(&self) -> u64 {
193 self.line_no
194 }
195
196 pub fn read_next_nonempty<'a>(
199 &mut self,
200 buf: &'a mut Vec<u8>,
201 ) -> Result<Option<(u64, &'a [u8])>, RowError> {
202 loop {
203 buf.clear();
204 let read = self.read_physical_line(buf)?;
205 if read == 0 {
206 return Ok(None);
207 }
208 self.line_no += 1;
209
210 strip_initial_bom(self.line_no, buf);
211 trim_line_ending(buf);
212
213 let (start, end) = non_ws_range(buf);
214 if start == end {
215 continue;
216 }
217
218 let len = end - start;
219 if len > self.max_line_len {
220 return Err(RowError::LineTooLarge {
221 line_no: self.line_no,
222 len,
223 max: self.max_line_len,
224 });
225 }
226
227 match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
228 FramePayload::Data(range) => {
229 return Ok(Some((
230 self.line_no,
231 &buf[start + range.start..start + range.end],
232 )));
233 }
234 FramePayload::Skip => continue,
235 }
236 }
237 }
238
239 pub fn read_next_owned(
243 &mut self,
244 buf: &mut Vec<u8>,
245 ) -> Result<Option<(u64, Vec<u8>)>, RowError> {
246 loop {
247 buf.clear();
248 let read = self.read_physical_line(buf)?;
249 if read == 0 {
250 return Ok(None);
251 }
252 self.line_no += 1;
253
254 strip_initial_bom(self.line_no, buf);
255 trim_line_ending(buf);
256
257 let (start, end) = non_ws_range(buf);
258 if start == end {
259 continue;
260 }
261
262 let len = end - start;
263 if len > self.max_line_len {
264 return Err(RowError::LineTooLarge {
265 line_no: self.line_no,
266 len,
267 max: self.max_line_len,
268 });
269 }
270
271 let payload = match frame_payload(self.row_frame, self.line_no, &buf[start..end])? {
272 FramePayload::Data(range) => start + range.start..start + range.end,
273 FramePayload::Skip => continue,
274 };
275 if payload.start > 0 || payload.end < buf.len() {
276 buf.copy_within(payload.clone(), 0);
277 buf.truncate(payload.end - payload.start);
278 }
279
280 let capacity = buf.capacity();
281 return Ok(Some((
282 self.line_no,
283 std::mem::replace(buf, Vec::with_capacity(capacity)),
284 )));
285 }
286 }
287
288 fn read_physical_line(&mut self, buf: &mut Vec<u8>) -> Result<usize, RowError> {
289 loop {
290 let available = self.reader.fill_buf()?;
291 if available.is_empty() {
292 return Ok(buf.len());
293 }
294
295 if let Some(pos) = memchr(b'\n', available) {
296 buf.extend_from_slice(&available[..=pos]);
297 self.reader.consume(pos + 1);
298 self.check_physical_line_len(buf.len())?;
299 return Ok(buf.len());
300 }
301
302 let len = available.len();
303 buf.extend_from_slice(available);
304 self.reader.consume(len);
305 self.check_physical_line_len(buf.len())?;
306 }
307 }
308
309 fn check_physical_line_len(&self, len: usize) -> Result<(), RowError> {
310 let hard_max = self.max_line_len.saturating_add(2);
311 if len > hard_max {
312 return Err(RowError::LineTooLarge {
313 line_no: self.line_no + 1,
314 len,
315 max: self.max_line_len,
316 });
317 }
318 Ok(())
319 }
320}
321
322#[derive(Clone, Copy, Debug, Eq, PartialEq)]
323pub enum NdjsonControl {
324 Continue,
325 Stop,
326}
327
328pub fn for_each_ndjson<R, F>(
329 engine: &JetroEngine,
330 reader: R,
331 query: &str,
332 f: F,
333) -> Result<usize, JetroEngineError>
334where
335 R: BufRead,
336 F: FnMut(Value),
337{
338 for_each_ndjson_with_options(engine, reader, query, NdjsonOptions::default(), f)
339}
340
341pub fn for_each_ndjson_with_options<R, F>(
342 engine: &JetroEngine,
343 reader: R,
344 query: &str,
345 options: NdjsonOptions,
346 mut f: F,
347) -> Result<usize, JetroEngineError>
348where
349 R: BufRead,
350 F: FnMut(Value),
351{
352 drive_ndjson(engine, reader, query, options, |value| {
353 f(value);
354 Ok(NdjsonControl::Continue)
355 })
356}
357
358pub fn for_each_ndjson_until<R, F>(
359 engine: &JetroEngine,
360 reader: R,
361 query: &str,
362 f: F,
363) -> Result<usize, JetroEngineError>
364where
365 R: BufRead,
366 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
367{
368 for_each_ndjson_until_with_options(engine, reader, query, NdjsonOptions::default(), f)
369}
370
371pub fn for_each_ndjson_until_with_options<R, F>(
372 engine: &JetroEngine,
373 reader: R,
374 query: &str,
375 options: NdjsonOptions,
376 f: F,
377) -> Result<usize, JetroEngineError>
378where
379 R: BufRead,
380 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
381{
382 drive_ndjson(engine, reader, query, options, f)
383}
384
385pub fn for_each_ndjson_source<F>(
386 engine: &JetroEngine,
387 source: NdjsonSource,
388 query: &str,
389 f: F,
390) -> Result<usize, JetroEngineError>
391where
392 F: FnMut(Value),
393{
394 for_each_ndjson_source_with_options(engine, source, query, NdjsonOptions::default(), f)
395}
396
397pub fn for_each_ndjson_source_with_options<F>(
398 engine: &JetroEngine,
399 source: NdjsonSource,
400 query: &str,
401 options: NdjsonOptions,
402 f: F,
403) -> Result<usize, JetroEngineError>
404where
405 F: FnMut(Value),
406{
407 match source {
408 NdjsonSource::File(path) => {
409 let file = File::open(path)?;
410 for_each_ndjson_with_options(
411 engine,
412 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
413 query,
414 options,
415 f,
416 )
417 }
418 NdjsonSource::Reader(reader) => {
419 for_each_ndjson_with_options(engine, reader, query, options, f)
420 }
421 }
422}
423
424pub fn for_each_ndjson_source_until<F>(
425 engine: &JetroEngine,
426 source: NdjsonSource,
427 query: &str,
428 f: F,
429) -> Result<usize, JetroEngineError>
430where
431 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
432{
433 for_each_ndjson_source_until_with_options(engine, source, query, NdjsonOptions::default(), f)
434}
435
436pub fn for_each_ndjson_source_until_with_options<F>(
437 engine: &JetroEngine,
438 source: NdjsonSource,
439 query: &str,
440 options: NdjsonOptions,
441 f: F,
442) -> Result<usize, JetroEngineError>
443where
444 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
445{
446 match source {
447 NdjsonSource::File(path) => {
448 let file = File::open(path)?;
449 for_each_ndjson_until_with_options(
450 engine,
451 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
452 query,
453 options,
454 f,
455 )
456 }
457 NdjsonSource::Reader(reader) => {
458 for_each_ndjson_until_with_options(engine, reader, query, options, f)
459 }
460 }
461}
462
463pub fn collect_ndjson<R>(
464 engine: &JetroEngine,
465 reader: R,
466 query: &str,
467) -> Result<Vec<Value>, JetroEngineError>
468where
469 R: BufRead,
470{
471 collect_ndjson_with_options(engine, reader, query, NdjsonOptions::default())
472}
473
474pub fn collect_ndjson_with_options<R>(
475 engine: &JetroEngine,
476 reader: R,
477 query: &str,
478 options: NdjsonOptions,
479) -> Result<Vec<Value>, JetroEngineError>
480where
481 R: BufRead,
482{
483 let mut values = Vec::new();
484 for_each_ndjson_with_options(engine, reader, query, options, |value| values.push(value))?;
485 Ok(values)
486}
487
488pub fn collect_ndjson_file<P>(
489 engine: &JetroEngine,
490 path: P,
491 query: &str,
492) -> Result<Vec<Value>, JetroEngineError>
493where
494 P: AsRef<Path>,
495{
496 let file = File::open(path)?;
497 let options = NdjsonOptions::default();
498 collect_ndjson_with_options(
499 engine,
500 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
501 query,
502 options,
503 )
504}
505
506pub fn collect_ndjson_file_with_options<P>(
507 engine: &JetroEngine,
508 path: P,
509 query: &str,
510 options: NdjsonOptions,
511) -> Result<Vec<Value>, JetroEngineError>
512where
513 P: AsRef<Path>,
514{
515 let file = File::open(path)?;
516 collect_ndjson_with_options(
517 engine,
518 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
519 query,
520 options,
521 )
522}
523
524pub fn collect_ndjson_source(
525 engine: &JetroEngine,
526 source: NdjsonSource,
527 query: &str,
528) -> Result<Vec<Value>, JetroEngineError> {
529 collect_ndjson_source_with_options(engine, source, query, NdjsonOptions::default())
530}
531
532pub fn collect_ndjson_source_with_options(
533 engine: &JetroEngine,
534 source: NdjsonSource,
535 query: &str,
536 options: NdjsonOptions,
537) -> Result<Vec<Value>, JetroEngineError> {
538 match source {
539 NdjsonSource::File(path) => collect_ndjson_file_with_options(engine, path, query, options),
540 NdjsonSource::Reader(reader) => collect_ndjson_with_options(engine, reader, query, options),
541 }
542}
543
544pub fn collect_ndjson_matches<R>(
545 engine: &JetroEngine,
546 reader: R,
547 predicate: &str,
548 limit: usize,
549) -> Result<Vec<Value>, JetroEngineError>
550where
551 R: BufRead,
552{
553 collect_ndjson_matches_with_options(engine, reader, predicate, limit, NdjsonOptions::default())
554}
555
556pub fn collect_ndjson_matches_with_options<R>(
557 engine: &JetroEngine,
558 reader: R,
559 predicate: &str,
560 limit: usize,
561 options: NdjsonOptions,
562) -> Result<Vec<Value>, JetroEngineError>
563where
564 R: BufRead,
565{
566 let mut values = Vec::with_capacity(limit);
567 drive_ndjson_matches(engine, reader, predicate, limit, options, |value| {
568 values.push(Value::from(value));
569 Ok(NdjsonControl::Continue)
570 })?;
571 Ok(values)
572}
573
574pub fn collect_ndjson_matches_file<P>(
575 engine: &JetroEngine,
576 path: P,
577 predicate: &str,
578 limit: usize,
579) -> Result<Vec<Value>, JetroEngineError>
580where
581 P: AsRef<Path>,
582{
583 let file = File::open(path)?;
584 let options = NdjsonOptions::default();
585 collect_ndjson_matches_with_options(
586 engine,
587 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
588 predicate,
589 limit,
590 options,
591 )
592}
593
594pub fn collect_ndjson_matches_file_with_options<P>(
595 engine: &JetroEngine,
596 path: P,
597 predicate: &str,
598 limit: usize,
599 options: NdjsonOptions,
600) -> Result<Vec<Value>, JetroEngineError>
601where
602 P: AsRef<Path>,
603{
604 let file = File::open(path)?;
605 collect_ndjson_matches_with_options(
606 engine,
607 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
608 predicate,
609 limit,
610 options,
611 )
612}
613
614pub fn collect_ndjson_matches_source(
615 engine: &JetroEngine,
616 source: NdjsonSource,
617 predicate: &str,
618 limit: usize,
619) -> Result<Vec<Value>, JetroEngineError> {
620 collect_ndjson_matches_source_with_options(
621 engine,
622 source,
623 predicate,
624 limit,
625 NdjsonOptions::default(),
626 )
627}
628
629pub fn collect_ndjson_matches_source_with_options(
630 engine: &JetroEngine,
631 source: NdjsonSource,
632 predicate: &str,
633 limit: usize,
634 options: NdjsonOptions,
635) -> Result<Vec<Value>, JetroEngineError> {
636 match source {
637 NdjsonSource::File(path) => {
638 collect_ndjson_matches_file_with_options(engine, path, predicate, limit, options)
639 }
640 NdjsonSource::Reader(reader) => {
641 collect_ndjson_matches_with_options(engine, reader, predicate, limit, options)
642 }
643 }
644}
645
646pub fn run_ndjson<R, W>(
647 engine: &JetroEngine,
648 reader: R,
649 query: &str,
650 writer: W,
651) -> Result<usize, JetroEngineError>
652where
653 R: BufRead,
654 W: Write,
655{
656 run_ndjson_with_options(engine, reader, query, writer, NdjsonOptions::default())
657}
658
659pub fn run_ndjson_file<P, W>(
660 engine: &JetroEngine,
661 path: P,
662 query: &str,
663 writer: W,
664) -> Result<usize, JetroEngineError>
665where
666 P: AsRef<Path>,
667 W: Write,
668{
669 let options = NdjsonOptions::default();
670 run_ndjson_file_with_options(engine, path, query, writer, options)
671}
672
673pub fn run_ndjson_file_with_options<P, W>(
674 engine: &JetroEngine,
675 path: P,
676 query: &str,
677 writer: W,
678 options: NdjsonOptions,
679) -> Result<usize, JetroEngineError>
680where
681 P: AsRef<Path>,
682 W: Write,
683{
684 if let Some(plan) = ndjson_rows_stream_plan(query)? {
685 return drive_ndjson_rows_stream_file(engine, path, &plan, None, options, writer);
686 }
687 if let Some(plan) = ndjson_rows_subquery_plan(query)? {
688 return drive_ndjson_rows_subquery_file(engine, path, &plan, options, writer);
689 }
690
691 let file = File::open(path)?;
692 run_ndjson_with_options(
693 engine,
694 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
695 query,
696 writer,
697 options,
698 )
699}
700
701pub fn run_ndjson_with_options<R, W>(
702 engine: &JetroEngine,
703 reader: R,
704 query: &str,
705 writer: W,
706 options: NdjsonOptions,
707) -> Result<usize, JetroEngineError>
708where
709 R: BufRead,
710 W: Write,
711{
712 if let Some(plan) = ndjson_rows_stream_plan(query)? {
713 return drive_ndjson_rows_stream_reader(engine, reader, &plan, None, options, writer);
714 }
715 if ndjson_rows_subquery_plan(query)?.is_some() {
716 return Err(JetroEngineError::Eval(EvalError(
717 "$.rows() stream subqueries require a file-backed NDJSON source".into(),
718 )));
719 }
720
721 drive_ndjson_writer(engine, reader, query, None, options, writer)
722}
723
724pub fn run_ndjson_limit<R, W>(
725 engine: &JetroEngine,
726 reader: R,
727 query: &str,
728 limit: usize,
729 writer: W,
730) -> Result<usize, JetroEngineError>
731where
732 R: BufRead,
733 W: Write,
734{
735 run_ndjson_limit_with_options(
736 engine,
737 reader,
738 query,
739 limit,
740 writer,
741 NdjsonOptions::default(),
742 )
743}
744
745pub fn run_ndjson_limit_with_options<R, W>(
746 engine: &JetroEngine,
747 reader: R,
748 query: &str,
749 limit: usize,
750 writer: W,
751 options: NdjsonOptions,
752) -> Result<usize, JetroEngineError>
753where
754 R: BufRead,
755 W: Write,
756{
757 if limit == 0 {
758 return Ok(0);
759 }
760
761 if let Some(plan) = ndjson_rows_stream_plan(query)? {
762 return drive_ndjson_rows_stream_reader(
763 engine,
764 reader,
765 &plan,
766 Some(limit),
767 options,
768 writer,
769 );
770 }
771
772 drive_ndjson_writer(engine, reader, query, Some(limit), options, writer)
773}
774
775pub fn run_ndjson_file_limit<P, W>(
776 engine: &JetroEngine,
777 path: P,
778 query: &str,
779 limit: usize,
780 writer: W,
781) -> Result<usize, JetroEngineError>
782where
783 P: AsRef<Path>,
784 W: Write,
785{
786 let options = NdjsonOptions::default();
787 run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
788}
789
790pub fn run_ndjson_file_limit_with_options<P, W>(
791 engine: &JetroEngine,
792 path: P,
793 query: &str,
794 limit: usize,
795 writer: W,
796 options: NdjsonOptions,
797) -> Result<usize, JetroEngineError>
798where
799 P: AsRef<Path>,
800 W: Write,
801{
802 if limit == 0 {
803 return Ok(0);
804 }
805 if let Some(plan) = ndjson_rows_stream_plan(query)? {
806 return drive_ndjson_rows_stream_file(engine, path, &plan, Some(limit), options, writer);
807 }
808 if let Some(plan) = ndjson_rows_subquery_plan(query)? {
809 return drive_ndjson_rows_subquery_file(engine, path, &plan, options, writer);
810 }
811
812 let file = File::open(path)?;
813 run_ndjson_limit_with_options(
814 engine,
815 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
816 query,
817 limit,
818 writer,
819 options,
820 )
821}
822
823pub fn run_ndjson_source<W>(
824 engine: &JetroEngine,
825 source: NdjsonSource,
826 query: &str,
827 writer: W,
828) -> Result<usize, JetroEngineError>
829where
830 W: Write,
831{
832 run_ndjson_source_with_options(engine, source, query, writer, NdjsonOptions::default())
833}
834
835pub fn run_ndjson_source_with_options<W>(
836 engine: &JetroEngine,
837 source: NdjsonSource,
838 query: &str,
839 writer: W,
840 options: NdjsonOptions,
841) -> Result<usize, JetroEngineError>
842where
843 W: Write,
844{
845 match source {
846 NdjsonSource::File(path) => {
847 run_ndjson_file_with_options(engine, path, query, writer, options)
848 }
849 NdjsonSource::Reader(reader) => {
850 run_ndjson_with_options(engine, reader, query, writer, options)
851 }
852 }
853}
854
855pub fn run_ndjson_source_limit<W>(
856 engine: &JetroEngine,
857 source: NdjsonSource,
858 query: &str,
859 limit: usize,
860 writer: W,
861) -> Result<usize, JetroEngineError>
862where
863 W: Write,
864{
865 run_ndjson_source_limit_with_options(
866 engine,
867 source,
868 query,
869 limit,
870 writer,
871 NdjsonOptions::default(),
872 )
873}
874
875pub fn run_ndjson_source_limit_with_options<W>(
876 engine: &JetroEngine,
877 source: NdjsonSource,
878 query: &str,
879 limit: usize,
880 writer: W,
881 options: NdjsonOptions,
882) -> Result<usize, JetroEngineError>
883where
884 W: Write,
885{
886 match source {
887 NdjsonSource::File(path) => {
888 run_ndjson_file_limit_with_options(engine, path, query, limit, writer, options)
889 }
890 NdjsonSource::Reader(reader) => {
891 run_ndjson_limit_with_options(engine, reader, query, limit, writer, options)
892 }
893 }
894}
895
896pub fn run_ndjson_matches<R, W>(
897 engine: &JetroEngine,
898 reader: R,
899 predicate: &str,
900 limit: usize,
901 writer: W,
902) -> Result<usize, JetroEngineError>
903where
904 R: BufRead,
905 W: Write,
906{
907 run_ndjson_matches_with_options(
908 engine,
909 reader,
910 predicate,
911 limit,
912 writer,
913 NdjsonOptions::default(),
914 )
915}
916
917pub fn run_ndjson_matches_with_options<R, W>(
918 engine: &JetroEngine,
919 reader: R,
920 predicate: &str,
921 limit: usize,
922 writer: W,
923 options: NdjsonOptions,
924) -> Result<usize, JetroEngineError>
925where
926 R: BufRead,
927 W: Write,
928{
929 drive_ndjson_matches_writer(engine, reader, predicate, limit, options, writer)
930}
931
932pub fn run_ndjson_matches_file<P, W>(
933 engine: &JetroEngine,
934 path: P,
935 predicate: &str,
936 limit: usize,
937 writer: W,
938) -> Result<usize, JetroEngineError>
939where
940 P: AsRef<Path>,
941 W: Write,
942{
943 let file = File::open(path)?;
944 let options = NdjsonOptions::default();
945 run_ndjson_matches_with_options(
946 engine,
947 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
948 predicate,
949 limit,
950 writer,
951 options,
952 )
953}
954
955pub fn run_ndjson_matches_file_with_options<P, W>(
956 engine: &JetroEngine,
957 path: P,
958 predicate: &str,
959 limit: usize,
960 writer: W,
961 options: NdjsonOptions,
962) -> Result<usize, JetroEngineError>
963where
964 P: AsRef<Path>,
965 W: Write,
966{
967 let file = File::open(path)?;
968 run_ndjson_matches_with_options(
969 engine,
970 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
971 predicate,
972 limit,
973 writer,
974 options,
975 )
976}
977
978pub fn run_ndjson_matches_source<W>(
979 engine: &JetroEngine,
980 source: NdjsonSource,
981 predicate: &str,
982 limit: usize,
983 writer: W,
984) -> Result<usize, JetroEngineError>
985where
986 W: Write,
987{
988 run_ndjson_matches_source_with_options(
989 engine,
990 source,
991 predicate,
992 limit,
993 writer,
994 NdjsonOptions::default(),
995 )
996}
997
998pub fn run_ndjson_matches_source_with_options<W>(
999 engine: &JetroEngine,
1000 source: NdjsonSource,
1001 predicate: &str,
1002 limit: usize,
1003 writer: W,
1004 options: NdjsonOptions,
1005) -> Result<usize, JetroEngineError>
1006where
1007 W: Write,
1008{
1009 match source {
1010 NdjsonSource::File(path) => {
1011 run_ndjson_matches_file_with_options(engine, path, predicate, limit, writer, options)
1012 }
1013 NdjsonSource::Reader(reader) => {
1014 run_ndjson_matches_with_options(engine, reader, predicate, limit, writer, options)
1015 }
1016 }
1017}
1018
1019fn drive_ndjson<R, F>(
1020 engine: &JetroEngine,
1021 reader: R,
1022 query: &str,
1023 options: NdjsonOptions,
1024 mut emit: F,
1025) -> Result<usize, JetroEngineError>
1026where
1027 R: BufRead,
1028 F: FnMut(Value) -> Result<NdjsonControl, JetroEngineError>,
1029{
1030 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1031 let plan = engine.cached_plan(query, PlanningContext::bytes());
1032 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1033 let mut count = 0;
1034
1035 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1036 let document = parse_row(engine, line_no, row)?;
1037 let out = collect_row_val(engine, &document, &plan, line_no)?;
1038 count += 1;
1039 if matches!(emit(Value::from(out))?, NdjsonControl::Stop) {
1040 break;
1041 }
1042 }
1043
1044 Ok(count)
1045}
1046
1047fn ndjson_rows_stream_plan(query: &str) -> Result<Option<RowStreamPlan>, JetroEngineError> {
1048 lower_root_rows_query(query, RowStreamSourceKind::NdjsonRows)
1049 .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
1050}
1051
1052fn ndjson_rows_subquery_plan(
1053 query: &str,
1054) -> Result<Option<RowStreamSubqueryPlan>, JetroEngineError> {
1055 if !query.contains("$.rows") {
1056 return Ok(None);
1057 }
1058 let expr = crate::parse::parser::parse(query)
1059 .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))?;
1060 lower_single_rows_subquery(&expr, RowStreamSourceKind::NdjsonRows)
1061 .map_err(|err| JetroEngineError::Eval(EvalError(err.to_string())))
1062}
1063
1064fn drive_ndjson_rows_subquery_file<P, W>(
1065 engine: &JetroEngine,
1066 path: P,
1067 plan: &RowStreamSubqueryPlan,
1068 options: NdjsonOptions,
1069 writer: W,
1070) -> Result<usize, JetroEngineError>
1071where
1072 P: AsRef<Path>,
1073 W: Write,
1074{
1075 let stream_value = collect_ndjson_rows_stream_file(engine, path, &plan.stream, options)?;
1076 let wrapper = Compiler::compile(&plan.wrapper, "<ndjson-rows-wrapper>");
1077 let env = Env::new(Val::Null).with_var(STREAM_BINDING, stream_value);
1078 let value = engine
1079 .lock_vm()
1080 .exec_in_env(&wrapper, &env)
1081 .map_err(JetroEngineError::Eval)?;
1082 let mut writer = ndjson_writer_with_options(writer, options);
1083 let emitted = write_val_line_with_options(&mut writer, &value, options)? as usize;
1084 writer.flush()?;
1085 Ok(emitted)
1086}
1087
1088fn collect_ndjson_rows_stream_file<P>(
1089 engine: &JetroEngine,
1090 path: P,
1091 plan: &RowStreamPlan,
1092 options: NdjsonOptions,
1093) -> Result<Val, JetroEngineError>
1094where
1095 P: AsRef<Path>,
1096{
1097 if let Some(value) =
1098 super::ndjson_parallel::collect_rows_stream_file(engine, path.as_ref(), plan, options)?
1099 {
1100 return Ok(value);
1101 }
1102
1103 let mut executor = CompiledRowStream::new(plan);
1104 let mut out = Vec::new();
1105
1106 if plan.direction == RowStreamDirection::Forward {
1107 let file = File::open(path)?;
1108 let mut driver = NdjsonPerRowDriver::new(std::io::BufReader::with_capacity(
1109 options.reader_buffer_capacity,
1110 file,
1111 ))
1112 .with_options(options);
1113 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1114 while !executor.is_exhausted() {
1115 let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1116 break;
1117 };
1118 if collect_row_stream_result(
1119 engine,
1120 line_no,
1121 executor.apply_owned_row(engine, line_no, row)?,
1122 &mut out,
1123 )? || executor.is_exhausted()
1124 {
1125 break;
1126 }
1127 }
1128 } else {
1129 let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1130 while !executor.is_exhausted() {
1131 let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1132 break;
1133 };
1134 if collect_row_stream_result(
1135 engine,
1136 line_no,
1137 executor.apply_owned_row(engine, line_no, row)?,
1138 &mut out,
1139 )? || executor.is_exhausted()
1140 {
1141 break;
1142 }
1143 }
1144 }
1145
1146 if plan.demand.retained_limit == Some(1) {
1147 Ok(out.into_iter().next().unwrap_or(Val::Null))
1148 } else {
1149 Ok(Val::Arr(std::sync::Arc::new(out)))
1150 }
1151}
1152
1153fn collect_row_stream_result(
1154 engine: &JetroEngine,
1155 line_no: u64,
1156 result: RowStreamRowResult,
1157 out: &mut Vec<Val>,
1158) -> Result<bool, JetroEngineError> {
1159 match result {
1160 RowStreamRowResult::Emit(value) => out.push(value),
1161 RowStreamRowResult::EmitBytes(bytes) => {
1162 let document = parse_row(engine, line_no, bytes)?;
1163 let value = document
1164 .root_val_with(engine.keys())
1165 .map_err(|err| row_eval_error(line_no, err))?;
1166 out.push(value);
1167 }
1168 RowStreamRowResult::Skip => {}
1169 RowStreamRowResult::Stop => return Ok(true),
1170 }
1171 Ok(false)
1172}
1173
1174fn drive_ndjson_rows_stream_reader<R, W>(
1175 engine: &JetroEngine,
1176 reader: R,
1177 plan: &RowStreamPlan,
1178 external_limit: Option<usize>,
1179 options: NdjsonOptions,
1180 writer: W,
1181) -> Result<usize, JetroEngineError>
1182where
1183 R: BufRead,
1184 W: Write,
1185{
1186 let (emitted, _) = drive_ndjson_rows_stream_reader_with_stats(
1187 engine,
1188 reader,
1189 plan,
1190 external_limit,
1191 options,
1192 writer,
1193 )?;
1194 Ok(emitted)
1195}
1196
1197fn drive_ndjson_rows_stream_reader_with_stats<R, W>(
1198 engine: &JetroEngine,
1199 reader: R,
1200 plan: &RowStreamPlan,
1201 external_limit: Option<usize>,
1202 options: NdjsonOptions,
1203 writer: W,
1204) -> Result<(usize, RowStreamStats), JetroEngineError>
1205where
1206 R: BufRead,
1207 W: Write,
1208{
1209 if plan.direction == RowStreamDirection::Reverse {
1210 return Err(JetroEngineError::Eval(EvalError(
1211 "$.rows().reverse() requires a file-backed NDJSON source".into(),
1212 )));
1213 }
1214
1215 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1216 let mut executor = CompiledRowStream::new(plan);
1217 let mut writer = ndjson_writer_with_options(writer, options);
1218 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1219 let mut emitted = 0usize;
1220
1221 while !executor.is_exhausted() {
1222 let Some((line_no, row)) = driver.read_next_owned(&mut buf)? else {
1223 break;
1224 };
1225 if emit_row_stream_result(
1226 executor.apply_owned_row(engine, line_no, row)?,
1227 &mut writer,
1228 &mut emitted,
1229 external_limit,
1230 options,
1231 )? {
1232 break;
1233 }
1234 if executor.is_exhausted() {
1235 break;
1236 }
1237 }
1238
1239 writer.flush()?;
1240 Ok((emitted, executor.stats().clone()))
1241}
1242
1243fn drive_ndjson_rows_stream_file<P, W>(
1244 engine: &JetroEngine,
1245 path: P,
1246 plan: &RowStreamPlan,
1247 external_limit: Option<usize>,
1248 options: NdjsonOptions,
1249 writer: W,
1250) -> Result<usize, JetroEngineError>
1251where
1252 P: AsRef<Path>,
1253 W: Write,
1254{
1255 if let Some(value) =
1256 super::ndjson_parallel::collect_rows_stream_file(engine, path.as_ref(), plan, options)?
1257 {
1258 return write_collected_rows_stream(value, external_limit, options, writer);
1259 }
1260
1261 let (emitted, _) = drive_ndjson_rows_stream_file_with_stats(
1262 engine,
1263 path,
1264 plan,
1265 external_limit,
1266 options,
1267 writer,
1268 )?;
1269 Ok(emitted)
1270}
1271
1272fn write_collected_rows_stream<W: Write>(
1273 value: Val,
1274 external_limit: Option<usize>,
1275 options: NdjsonOptions,
1276 writer: W,
1277) -> Result<usize, JetroEngineError> {
1278 let mut writer = ndjson_writer_with_options(writer, options);
1279 let mut emitted = 0usize;
1280 match value {
1281 Val::Arr(values) => {
1282 for value in values.iter() {
1283 if external_limit.is_some_and(|limit| emitted >= limit) {
1284 break;
1285 }
1286 if write_val_line_with_options(&mut writer, value, options)? {
1287 emitted += 1;
1288 }
1289 }
1290 }
1291 value => {
1292 if write_val_line_with_options(&mut writer, &value, options)? {
1293 emitted += 1;
1294 }
1295 }
1296 }
1297 writer.flush()?;
1298 Ok(emitted)
1299}
1300
1301fn drive_ndjson_rows_stream_file_with_stats<P, W>(
1302 engine: &JetroEngine,
1303 path: P,
1304 plan: &RowStreamPlan,
1305 external_limit: Option<usize>,
1306 options: NdjsonOptions,
1307 writer: W,
1308) -> Result<(usize, RowStreamStats), JetroEngineError>
1309where
1310 P: AsRef<Path>,
1311 W: Write,
1312{
1313 if plan.direction == RowStreamDirection::Forward {
1314 let file = File::open(path)?;
1315 return drive_ndjson_rows_stream_reader_with_stats(
1316 engine,
1317 std::io::BufReader::with_capacity(options.reader_buffer_capacity, file),
1318 plan,
1319 external_limit,
1320 options,
1321 writer,
1322 );
1323 }
1324
1325 let mut driver = super::ndjson_rev::NdjsonReverseFileDriver::with_options(path, options)?;
1326 let mut executor = CompiledRowStream::new(plan);
1327 let mut writer = ndjson_writer_with_options(writer, options);
1328 let mut emitted = 0usize;
1329
1330 while !executor.is_exhausted() {
1331 let Some((line_no, row)) = driver.next_line_with_reverse_no()? else {
1332 break;
1333 };
1334 if emit_row_stream_result(
1335 executor.apply_owned_row(engine, line_no, row)?,
1336 &mut writer,
1337 &mut emitted,
1338 external_limit,
1339 options,
1340 )? {
1341 break;
1342 }
1343 if executor.is_exhausted() {
1344 break;
1345 }
1346 }
1347
1348 writer.flush()?;
1349 Ok((emitted, executor.stats().clone()))
1350}
1351
1352fn emit_row_stream_result<W: Write>(
1353 result: RowStreamRowResult,
1354 writer: &mut W,
1355 emitted: &mut usize,
1356 external_limit: Option<usize>,
1357 options: NdjsonOptions,
1358) -> Result<bool, JetroEngineError> {
1359 let wrote = match result {
1360 RowStreamRowResult::Emit(value) => write_val_line_with_options(writer, &value, options)?,
1361 RowStreamRowResult::EmitBytes(bytes) => {
1362 write_json_bytes_line_with_options(writer, &bytes, options)?
1363 }
1364 RowStreamRowResult::Skip => return Ok(false),
1365 RowStreamRowResult::Stop => return Ok(true),
1366 };
1367 if wrote {
1368 *emitted += 1;
1369 }
1370 Ok(external_limit.is_some_and(|limit| *emitted >= limit))
1371}
1372
1373fn drive_ndjson_writer<R, W>(
1374 engine: &JetroEngine,
1375 reader: R,
1376 query: &str,
1377 limit: Option<usize>,
1378 options: NdjsonOptions,
1379 writer: W,
1380) -> Result<usize, JetroEngineError>
1381where
1382 R: BufRead,
1383 W: Write,
1384{
1385 #[cfg(feature = "simd-json")]
1386 if let Some((byte_plan, tape_plan)) = direct_writer_plans(engine, query) {
1387 if let Some(byte_plan) = byte_plan {
1388 return drive_ndjson_byte_writer(
1389 engine, reader, &byte_plan, &tape_plan, limit, options, writer,
1390 );
1391 }
1392 if tape_plan_can_write_byte_row(&tape_plan) {
1393 return drive_ndjson_tape_byte_writer(
1394 engine, reader, &tape_plan, limit, options, writer,
1395 );
1396 }
1397 return drive_ndjson_tape_writer(engine, reader, &tape_plan, limit, options, writer);
1398 }
1399
1400 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1401 let mut executor = NdjsonRowExecutor::new(engine, query);
1402 let mut writer = ndjson_writer_with_options(writer, options);
1403 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
1404 let mut count = 0usize;
1405
1406 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
1407 let value = executor.eval_owned_row(line_no, row)?;
1408 if write_val_line_with_options(&mut writer, &value, options)? {
1409 count += 1;
1410 }
1411 if limit.is_some_and(|limit| count >= limit) {
1412 break;
1413 }
1414 }
1415
1416 writer.flush()?;
1417 Ok(count)
1418}
1419
1420#[cfg(feature = "simd-json")]
1421fn drive_ndjson_byte_writer<R, W>(
1422 engine: &JetroEngine,
1423 reader: R,
1424 byte_plan: &NdjsonDirectBytePlan,
1425 tape_plan: &NdjsonDirectTapePlan,
1426 limit: Option<usize>,
1427 options: NdjsonOptions,
1428 writer: W,
1429) -> Result<usize, JetroEngineError>
1430where
1431 R: BufRead,
1432 W: Write,
1433{
1434 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1435 let mut writer = ndjson_writer_with_options(writer, options);
1436 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1437 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1438 let mut scratch =
1439 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1440 let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1441 let mut count = 0usize;
1442
1443 visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1444 out.clear();
1445 match write_ndjson_byte_plan_row(&mut out, row, byte_plan)? {
1446 BytePlanWrite::Done => {}
1447 BytePlanWrite::Fallback => {
1448 scratch.parse_slice(row).map_err(|message| {
1449 row_parse_error(
1450 line_no,
1451 JetroEngineError::Eval(crate::EvalError(format!(
1452 "Invalid JSON: {message}"
1453 ))),
1454 )
1455 })?;
1456 tape_runner.write_row(&scratch, &mut out)?;
1457 }
1458 }
1459 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1460 count += 1;
1461 }
1462 Ok(!limit.is_some_and(|limit| count >= limit))
1463 })?;
1464
1465 writer.flush()?;
1466 Ok(count)
1467}
1468
1469#[cfg(feature = "simd-json")]
1470fn drive_ndjson_tape_byte_writer<R, W>(
1471 engine: &JetroEngine,
1472 reader: R,
1473 tape_plan: &NdjsonDirectTapePlan,
1474 limit: Option<usize>,
1475 options: NdjsonOptions,
1476 writer: W,
1477) -> Result<usize, JetroEngineError>
1478where
1479 R: BufRead,
1480 W: Write,
1481{
1482 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1483 let mut writer = ndjson_writer_with_options(writer, options);
1484 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1485 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1486 let mut scratch =
1487 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1488 let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
1489 let mut tape_runner = NdjsonTapeWriterRunner::new(engine, tape_plan);
1490 let mut constant_stream_cache = NdjsonConstantStreamCache::default();
1491 let mut hint_state = matches!(
1492 tape_plan,
1493 NdjsonDirectTapePlan::Object(_)
1494 | NdjsonDirectTapePlan::Array(_)
1495 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1496 sink: NdjsonDirectStreamSink::Collect(_),
1497 ..
1498 })
1499 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1500 sink: NdjsonDirectStreamSink::First(_),
1501 ..
1502 })
1503 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1504 sink: NdjsonDirectStreamSink::Last(_),
1505 ..
1506 })
1507 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1508 sink: NdjsonDirectStreamSink::Count,
1509 ..
1510 })
1511 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1512 sink: NdjsonDirectStreamSink::Numeric { .. },
1513 ..
1514 })
1515 | NdjsonDirectTapePlan::Stream(NdjsonDirectStreamPlan {
1516 sink: NdjsonDirectStreamSink::Extreme { .. },
1517 ..
1518 })
1519 )
1520 .then(|| {
1521 NdjsonHintState::new(
1522 NdjsonHintConfig::default(),
1523 NdjsonHintAccessPlan::from_direct_plans(None, tape_plan),
1524 )
1525 });
1526 let mut count = 0usize;
1527
1528 visit_ndjson_borrowed_rows(&mut driver, &mut line, |line_no, row| {
1529 out.clear();
1530 if let Some(write) = constant_stream_cache.write_row(&mut out, row, tape_plan)? {
1531 if matches!(write, BytePlanWrite::Done) {
1532 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1533 count += 1;
1534 }
1535 return Ok(!limit.is_some_and(|limit| count >= limit));
1536 }
1537 }
1538 let hinted = if let Some(state) = hint_state.as_mut() {
1539 if state.observe_row(row) == NdjsonHintDecision::UseHints {
1540 byte_scratch.clear();
1541 let write = state
1542 .with_root_layout_match(row, |root, matched| {
1543 write_ndjson_hinted_tape_plan_row(
1544 &mut byte_scratch,
1545 tape_plan,
1546 root,
1547 matched,
1548 )
1549 })
1550 .transpose()?
1551 .unwrap_or(BytePlanWrite::Fallback);
1552 if matches!(write, BytePlanWrite::Done) {
1553 out.extend_from_slice(&byte_scratch);
1554 }
1555 Some(write)
1556 } else {
1557 None
1558 }
1559 } else {
1560 None
1561 };
1562 let write = match hinted {
1563 Some(write) => Ok(write),
1564 None => write_ndjson_byte_tape_plan_row(&mut out, row, tape_plan, &mut byte_scratch),
1565 };
1566 match write? {
1567 BytePlanWrite::Done => {}
1568 BytePlanWrite::Fallback => {
1569 scratch.parse_slice(row).map_err(|message| {
1570 row_parse_error(
1571 line_no,
1572 JetroEngineError::Eval(crate::EvalError(format!(
1573 "Invalid JSON: {message}"
1574 ))),
1575 )
1576 })?;
1577 tape_runner.write_row(&scratch, &mut out)?;
1578 }
1579 }
1580 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1581 count += 1;
1582 }
1583 Ok(!limit.is_some_and(|limit| count >= limit))
1584 })?;
1585
1586 writer.flush()?;
1587 Ok(count)
1588}
1589
1590#[cfg(feature = "simd-json")]
1591fn visit_ndjson_borrowed_rows<R, F>(
1592 driver: &mut NdjsonPerRowDriver<R>,
1593 spill: &mut Vec<u8>,
1594 mut visit: F,
1595) -> Result<(), JetroEngineError>
1596where
1597 R: BufRead,
1598 F: FnMut(u64, &[u8]) -> Result<bool, JetroEngineError>,
1599{
1600 loop {
1601 spill.clear();
1602 let available = driver.reader.fill_buf()?;
1603 if available.is_empty() {
1604 return Ok(());
1605 }
1606 if let Some(pos) = memchr(b'\n', available) {
1607 driver.line_no += 1;
1608 let line_no = driver.line_no;
1609 let mut row = &available[..pos];
1610 if row.last() == Some(&b'\r') {
1611 row = &row[..row.len() - 1];
1612 }
1613 if line_no == 1 && row.starts_with(&[0xef, 0xbb, 0xbf]) {
1614 row = &row[3..];
1615 }
1616 let (start, end) = non_ws_range(row);
1617 let keep_going = if start == end {
1618 true
1619 } else {
1620 let trimmed = &row[start..end];
1621 if trimmed.len() > driver.max_line_len {
1622 return Err(RowError::LineTooLarge {
1623 line_no,
1624 len: trimmed.len(),
1625 max: driver.max_line_len,
1626 }
1627 .into());
1628 }
1629 match frame_payload(driver.row_frame, line_no, trimmed)? {
1630 FramePayload::Data(range) => visit(line_no, &trimmed[range])?,
1631 FramePayload::Skip => true,
1632 }
1633 };
1634 driver.reader.consume(pos + 1);
1635 if !keep_going {
1636 return Ok(());
1637 }
1638 } else {
1639 let read = driver.read_physical_line(spill)?;
1640 if read == 0 {
1641 return Ok(());
1642 }
1643 driver.line_no += 1;
1644 strip_initial_bom(driver.line_no, spill);
1645 trim_line_ending(spill);
1646 let (start, end) = non_ws_range(spill);
1647 if start == end {
1648 continue;
1649 }
1650 let len = end - start;
1651 if len > driver.max_line_len {
1652 return Err(RowError::LineTooLarge {
1653 line_no: driver.line_no,
1654 len,
1655 max: driver.max_line_len,
1656 }
1657 .into());
1658 }
1659 match frame_payload(driver.row_frame, driver.line_no, &spill[start..end])? {
1660 FramePayload::Data(range) => {
1661 if !visit(
1662 driver.line_no,
1663 &spill[start + range.start..start + range.end],
1664 )? {
1665 return Ok(());
1666 }
1667 }
1668 FramePayload::Skip => {}
1669 }
1670 }
1671 }
1672}
1673
1674#[cfg(feature = "simd-json")]
1675fn drive_ndjson_tape_writer<R, W>(
1676 engine: &JetroEngine,
1677 reader: R,
1678 plan: &NdjsonDirectTapePlan,
1679 limit: Option<usize>,
1680 options: NdjsonOptions,
1681 writer: W,
1682) -> Result<usize, JetroEngineError>
1683where
1684 R: BufRead,
1685 W: Write,
1686{
1687 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1688 let mut writer = ndjson_writer_with_options(writer, options);
1689 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1690 let mut scratch =
1691 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1692 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
1693 let mut count = 0usize;
1694 let mut runner = NdjsonTapeWriterRunner::new(engine, plan);
1695
1696 while let Some((line_no, row)) = driver.read_next_nonempty(&mut line)? {
1697 out.clear();
1698 scratch.parse_slice(row).map_err(|message| {
1699 row_parse_error(
1700 line_no,
1701 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
1702 )
1703 })?;
1704 runner.write_row(&scratch, &mut out)?;
1705 if write_json_bytes_line_with_options(&mut writer, &out, options)? {
1706 count += 1;
1707 }
1708 if limit.is_some_and(|limit| count >= limit) {
1709 break;
1710 }
1711 }
1712
1713 writer.flush()?;
1714 Ok(count)
1715}
1716
1717#[cfg(feature = "simd-json")]
1718pub(super) struct NdjsonTapeWriterRunner<'a, 'p> {
1719 plan: &'p NdjsonDirectTapePlan,
1720 vm: Option<MutexGuard<'a, VM>>,
1721 env: Option<crate::data::context::Env>,
1722 root_path: NdjsonPathCache,
1723 source_path: NdjsonPathCache,
1724 suffix_path: NdjsonPathCache,
1725 predicate_path: NdjsonPathCache,
1726 object_paths: Vec<NdjsonPathCache>,
1727}
1728
1729#[cfg(feature = "simd-json")]
1730impl<'a, 'p> NdjsonTapeWriterRunner<'a, 'p> {
1731 pub(super) fn new(engine: &'a JetroEngine, plan: &'p NdjsonDirectTapePlan) -> Self {
1732 let needs_vm = plan.needs_vm();
1733 Self {
1734 plan,
1735 vm: needs_vm.then(|| engine.lock_vm()),
1736 env: needs_vm.then(|| crate::data::context::Env::new(Val::Null)),
1737 root_path: NdjsonPathCache::default(),
1738 source_path: NdjsonPathCache::default(),
1739 suffix_path: NdjsonPathCache::default(),
1740 predicate_path: NdjsonPathCache::default(),
1741 object_paths: Vec::new(),
1742 }
1743 }
1744
1745 pub(super) fn write_row<W: Write>(
1746 &mut self,
1747 scratch: &crate::data::tape::TapeScratch,
1748 writer: &mut W,
1749 ) -> Result<(), JetroEngineError> {
1750 match self.plan {
1751 NdjsonDirectTapePlan::RootPath(steps) => {
1752 if let Some(idx) = self.root_path.index(scratch, 0, steps) {
1753 write_json_tape_at(writer, scratch, idx)?;
1754 } else {
1755 writer.write_all(b"null")?;
1756 }
1757 }
1758 NdjsonDirectTapePlan::ViewScalarCall {
1759 steps,
1760 call,
1761 optional,
1762 } => {
1763 let idx = self.root_path.index(scratch, 0, steps);
1764 let value = idx
1765 .map(|idx| json_tape_scalar(scratch, idx))
1766 .unwrap_or(crate::util::JsonView::Null);
1767 if *optional && matches!(value, crate::util::JsonView::Null) {
1768 writer.write_all(b"null")?;
1769 } else if let Some(value) = call.try_apply_json_view(value) {
1770 write_val_json(writer, &value)?;
1771 } else if let Some(idx) = idx {
1772 write_json_tape_at(writer, scratch, idx)?;
1773 } else {
1774 writer.write_all(b"null")?;
1775 }
1776 }
1777 NdjsonDirectTapePlan::ArrayElementViewScalarCall {
1778 source_steps,
1779 element,
1780 suffix_steps,
1781 call,
1782 } => {
1783 let idx = self
1784 .source_path
1785 .index(scratch, 0, source_steps)
1786 .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1787 .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1788 if let Some(value) = idx
1789 .map(|idx| json_tape_scalar(scratch, idx))
1790 .and_then(|value| call.try_apply_json_view(value))
1791 {
1792 write_val_json(writer, &value)?;
1793 } else if let Some(idx) = idx {
1794 write_json_tape_at(writer, scratch, idx)?;
1795 } else {
1796 writer.write_all(b"null")?;
1797 }
1798 }
1799 NdjsonDirectTapePlan::ObjectItems { steps, method } => {
1800 let idx = self.root_path.index(scratch, 0, steps);
1801 write_json_tape_object_items(writer, scratch, idx, *method)?;
1802 }
1803 NdjsonDirectTapePlan::ArrayElementPath {
1804 source_steps,
1805 element,
1806 suffix_steps,
1807 } => {
1808 let idx = self
1809 .source_path
1810 .index(scratch, 0, source_steps)
1811 .and_then(|idx| json_tape_array_element(scratch, idx, *element))
1812 .and_then(|idx| self.suffix_path.index(scratch, idx, suffix_steps));
1813 if let Some(idx) = idx {
1814 write_json_tape_at(writer, scratch, idx)?;
1815 } else {
1816 writer.write_all(b"null")?;
1817 }
1818 }
1819 NdjsonDirectTapePlan::Stream(plan) => {
1820 write_json_tape_stream(
1821 writer,
1822 scratch,
1823 plan,
1824 &mut self.source_path,
1825 &mut self.suffix_path,
1826 &mut self.predicate_path,
1827 &mut self.object_paths,
1828 )?;
1829 }
1830 NdjsonDirectTapePlan::Object(fields) => {
1831 write_json_tape_object_projection(writer, scratch, fields, &mut self.object_paths)?;
1832 }
1833 NdjsonDirectTapePlan::Array(items) => {
1834 write_json_tape_array_projection(writer, scratch, items, &mut self.object_paths)?;
1835 }
1836 NdjsonDirectTapePlan::ViewPipeline { source_steps, body } => {
1837 let (Some(vm), Some(env)) = (self.vm.as_deref_mut(), self.env.as_ref()) else {
1838 return Err(JetroEngineError::Eval(crate::EvalError(
1839 "NDJSON view pipeline requires VM state".to_string(),
1840 )));
1841 };
1842 let source = json_tape_path_index(scratch, source_steps)
1843 .map(|idx| crate::data::view::TapeScratchView::Node { tape: scratch, idx })
1844 .unwrap_or(crate::data::view::TapeScratchView::Missing);
1845 let Some(result) =
1846 crate::exec::view::run_with_env_and_vm(source, body, None, &env, vm)
1847 else {
1848 writer.write_all(b"null")?;
1849 return Ok(());
1850 };
1851 write_val_json(writer, &result.map_err(JetroEngineError::Eval)?)?;
1852 }
1853 }
1854 Ok(())
1855 }
1856}
1857
1858#[cfg(feature = "simd-json")]
1859#[derive(Default)]
1860pub(super) struct NdjsonPathCache {
1861 fields: Vec<Option<NdjsonFieldCache>>,
1865}
1866
1867#[cfg(feature = "simd-json")]
1868#[derive(Clone, Copy)]
1869struct NdjsonFieldCache {
1870 key_delta: usize,
1871 value_delta: usize,
1872}
1873
1874#[cfg(feature = "simd-json")]
1875struct NdjsonPathCaches<'a> {
1876 source: &'a mut NdjsonPathCache,
1877 suffix: &'a mut NdjsonPathCache,
1878 predicate: &'a mut NdjsonPathCache,
1879}
1880
1881#[cfg(feature = "simd-json")]
1882impl NdjsonPathCache {
1883 fn index<T: JsonTape>(
1884 &mut self,
1885 tape: &T,
1886 start: usize,
1887 steps: &[crate::ir::physical::PhysicalPathStep],
1888 ) -> Option<usize> {
1889 if let Some(idx) = self.index_cached(tape, start, steps) {
1890 return Some(idx);
1891 }
1892 self.index_uncached(tape, start, steps)
1893 }
1894
1895 fn index_cached<T: JsonTape>(
1896 &self,
1897 tape: &T,
1898 start: usize,
1899 steps: &[crate::ir::physical::PhysicalPathStep],
1900 ) -> Option<usize> {
1901 use crate::ir::physical::PhysicalPathStep;
1902
1903 let [PhysicalPathStep::Field(key), rest @ ..] = steps else {
1904 return None;
1905 };
1906 if rest
1907 .iter()
1908 .any(|step| matches!(step, PhysicalPathStep::Field(_)))
1909 {
1910 return None;
1911 }
1912 let Some(field) = self
1913 .fields
1914 .first()
1915 .copied()
1916 .flatten()
1917 .filter(|field| field.key_delta > 1)
1918 else {
1919 return None;
1920 };
1921 let idx = json_tape_object_cached_field(tape, start, field, key.as_ref())?;
1922 let mut cur = idx;
1923 for step in rest {
1924 cur = json_tape_step_index(tape, cur, step)?;
1925 }
1926 Some(cur)
1927 }
1928
1929 fn index_uncached<T: JsonTape>(
1930 &mut self,
1931 tape: &T,
1932 start: usize,
1933 steps: &[crate::ir::physical::PhysicalPathStep],
1934 ) -> Option<usize> {
1935 self.index_from_depth(tape, start, steps, 0)
1936 }
1937
1938 fn index_from_depth<T: JsonTape>(
1939 &mut self,
1940 tape: &T,
1941 start: usize,
1942 steps: &[crate::ir::physical::PhysicalPathStep],
1943 depth: usize,
1944 ) -> Option<usize> {
1945 use crate::ir::physical::PhysicalPathStep;
1946
1947 match steps {
1948 [] => Some(start),
1949 [PhysicalPathStep::Field(key), rest @ ..] => {
1950 if self.fields.len() <= depth {
1951 self.fields.resize(depth + 1, None);
1952 }
1953
1954 if let Some(field) = self.fields[depth].filter(|field| field.key_delta > 1) {
1955 if let Some(idx) =
1956 json_tape_object_cached_field(tape, start, field, key.as_ref())
1957 {
1958 return self.index_from_depth(tape, idx, rest, depth + 1);
1959 }
1960 }
1961
1962 let (idx, field) =
1963 json_tape_object_field_index_and_cache(tape, start, key.as_ref())?;
1964 self.fields[depth] = Some(field);
1965 self.index_from_depth(tape, idx, rest, depth + 1)
1966 }
1967 [step, rest @ ..] => {
1968 let idx = json_tape_step_index(tape, start, step)?;
1969 self.index_from_depth(tape, idx, rest, depth + 1)
1970 }
1971 }
1972 }
1973}
1974
1975#[cfg(feature = "simd-json")]
1976fn drive_ndjson_tape_matches_writer<R, W>(
1977 engine: &JetroEngine,
1978 reader: R,
1979 predicate: &NdjsonDirectPredicate,
1980 limit: usize,
1981 options: NdjsonOptions,
1982 writer: W,
1983) -> Result<usize, JetroEngineError>
1984where
1985 R: BufRead,
1986 W: Write,
1987{
1988 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
1989 let mut writer = ndjson_writer_with_options(writer, options);
1990 let mut line = Vec::with_capacity(options.initial_buffer_capacity);
1991 let mut scratch =
1992 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
1993 let mut emitted = 0usize;
1994 let needs_vm = predicate_needs_vm(predicate);
1995 let mut vm = needs_vm.then(|| engine.lock_vm());
1996 let env = needs_vm.then(|| crate::data::context::Env::new(Val::Null));
1997 let mut predicate_path = NdjsonPathCache::default();
1998
1999 while let Some((line_no, row)) = driver.read_next_owned(&mut line)? {
2000 if let Some(matched) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2001 if !matched {
2002 continue;
2003 }
2004 writer.write_all(&row)?;
2005 writer.write_all(b"\n")?;
2006 emitted += 1;
2007 if emitted >= limit {
2008 break;
2009 }
2010 continue;
2011 }
2012
2013 scratch.parse_slice(&row).map_err(|message| {
2014 row_parse_error(
2015 line_no,
2016 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
2017 )
2018 })?;
2019 if !eval_tape_predicate(
2020 &scratch,
2021 predicate,
2022 env.as_ref(),
2023 &mut vm,
2024 &mut predicate_path,
2025 )
2026 .map_err(JetroEngineError::Eval)?
2027 {
2028 continue;
2029 }
2030 writer.write_all(&row)?;
2031 writer.write_all(b"\n")?;
2032 emitted += 1;
2033 if emitted >= limit {
2034 break;
2035 }
2036 }
2037
2038 writer.flush()?;
2039 Ok(emitted)
2040}
2041
2042fn drive_ndjson_matches<R, F>(
2043 engine: &JetroEngine,
2044 reader: R,
2045 predicate: &str,
2046 limit: usize,
2047 options: NdjsonOptions,
2048 mut emit: F,
2049) -> Result<usize, JetroEngineError>
2050where
2051 R: BufRead,
2052 F: FnMut(Val) -> Result<NdjsonControl, JetroEngineError>,
2053{
2054 if limit == 0 {
2055 return Ok(0);
2056 }
2057
2058 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2059 #[cfg(feature = "simd-json")]
2060 let direct_predicate = direct_tape_predicate(engine, predicate);
2061 let mut executor = NdjsonRowExecutor::new(engine, predicate);
2062 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2063 let mut emitted = 0usize;
2064
2065 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2066 #[cfg(feature = "simd-json")]
2067 if let Some(predicate) = direct_predicate.as_ref() {
2068 if let Some(false) = eval_ndjson_byte_predicate_row(&row, predicate)? {
2069 continue;
2070 }
2071 }
2072
2073 let document = executor.parse_owned_row(line_no, row)?;
2074 let matched = executor.eval_document(line_no, &document)?;
2075 if !is_truthy(&matched) {
2076 continue;
2077 }
2078
2079 let root = document
2080 .root_val_with(engine.keys())
2081 .map_err(|err| row_eval_error(line_no, err))?;
2082 emitted += 1;
2083 if matches!(emit(root)?, NdjsonControl::Stop) || emitted >= limit {
2084 break;
2085 }
2086 }
2087
2088 Ok(emitted)
2089}
2090
2091fn drive_ndjson_matches_writer<R, W>(
2092 engine: &JetroEngine,
2093 reader: R,
2094 predicate: &str,
2095 limit: usize,
2096 options: NdjsonOptions,
2097 writer: W,
2098) -> Result<usize, JetroEngineError>
2099where
2100 R: BufRead,
2101 W: Write,
2102{
2103 if limit == 0 {
2104 return Ok(0);
2105 }
2106
2107 #[cfg(feature = "simd-json")]
2108 if let Some(predicate) = direct_tape_predicate(engine, predicate) {
2109 return drive_ndjson_tape_matches_writer(
2110 engine, reader, &predicate, limit, options, writer,
2111 );
2112 }
2113
2114 let mut driver = NdjsonPerRowDriver::new(reader).with_options(options);
2115 let mut executor = NdjsonRowExecutor::new(engine, predicate);
2116 let mut writer = ndjson_writer_with_options(writer, options);
2117 let mut buf = Vec::with_capacity(options.initial_buffer_capacity);
2118 let mut emitted = 0usize;
2119
2120 while let Some((line_no, row)) = driver.read_next_owned(&mut buf)? {
2121 let document = executor.parse_owned_row(line_no, row)?;
2122 let matched = executor.eval_document(line_no, &document)?;
2123 if !is_truthy(&matched) {
2124 continue;
2125 }
2126
2127 write_document_line(&mut writer, &document, line_no, executor.engine())?;
2128 emitted += 1;
2129 if emitted >= limit {
2130 break;
2131 }
2132 }
2133
2134 writer.flush()?;
2135 Ok(emitted)
2136}
2137
2138pub(super) struct NdjsonRowExecutor<'a> {
2139 engine: &'a JetroEngine,
2140 plan: crate::ir::physical::QueryPlan,
2141 vm: MutexGuard<'a, VM>,
2142}
2143
2144impl<'a> NdjsonRowExecutor<'a> {
2145 pub(super) fn new(engine: &'a JetroEngine, query: &str) -> Self {
2146 Self {
2147 engine,
2148 plan: engine.cached_plan(query, PlanningContext::bytes()),
2149 vm: engine.lock_vm(),
2150 }
2151 }
2152
2153 pub(super) fn eval_owned_row(
2154 &mut self,
2155 line_no: u64,
2156 row: Vec<u8>,
2157 ) -> Result<Val, JetroEngineError> {
2158 let document = self.parse_owned_row(line_no, row)?;
2159 self.eval_document(line_no, &document)
2160 }
2161
2162 pub(super) fn parse_owned_row(
2163 &self,
2164 line_no: u64,
2165 row: Vec<u8>,
2166 ) -> Result<Jetro, JetroEngineError> {
2167 parse_row(self.engine, line_no, row)
2168 }
2169
2170 pub(super) fn eval_document(
2171 &mut self,
2172 line_no: u64,
2173 document: &Jetro,
2174 ) -> Result<Val, JetroEngineError> {
2175 crate::exec::router::collect_plan_val_with_vm(document, &self.plan, &mut self.vm)
2176 .map_err(|err| row_eval_error(line_no, err))
2177 }
2178
2179 pub(super) fn engine(&self) -> &'a JetroEngine {
2180 self.engine
2181 }
2182}
2183
2184#[cfg(feature = "simd-json")]
2185trait JsonTape {
2186 fn nodes(&self) -> &[crate::data::tape::TapeNode];
2187 fn str_at(&self, idx: usize) -> &str;
2188 fn span(&self, idx: usize) -> usize;
2189}
2190
2191#[cfg(feature = "simd-json")]
2192impl JsonTape for crate::data::tape::TapeData {
2193 #[inline]
2194 fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2195 &self.nodes
2196 }
2197
2198 #[inline]
2199 fn str_at(&self, idx: usize) -> &str {
2200 self.str_at(idx)
2201 }
2202
2203 #[inline]
2204 fn span(&self, idx: usize) -> usize {
2205 self.span(idx)
2206 }
2207}
2208
2209#[cfg(feature = "simd-json")]
2210impl JsonTape for crate::data::tape::TapeScratch {
2211 #[inline]
2212 fn nodes(&self) -> &[crate::data::tape::TapeNode] {
2213 &self.nodes
2214 }
2215
2216 #[inline]
2217 fn str_at(&self, idx: usize) -> &str {
2218 self.str_at(idx)
2219 }
2220
2221 #[inline]
2222 fn span(&self, idx: usize) -> usize {
2223 self.span(idx)
2224 }
2225}
2226
2227#[cfg(feature = "simd-json")]
2228fn json_tape_path_index<T: JsonTape>(
2229 tape: &T,
2230 steps: &[crate::ir::physical::PhysicalPathStep],
2231) -> Option<usize> {
2232 json_tape_path_index_from(tape, 0, steps)
2233}
2234
2235#[cfg(feature = "simd-json")]
2236fn json_tape_path_index_from<T: JsonTape>(
2237 tape: &T,
2238 start: usize,
2239 steps: &[crate::ir::physical::PhysicalPathStep],
2240) -> Option<usize> {
2241 if tape.nodes().is_empty() {
2242 return None;
2243 }
2244
2245 return match steps {
2246 [] => Some(start),
2247 [step] => json_tape_step_index(tape, start, step),
2248 [first, second] => json_tape_step_index(tape, start, first)
2249 .and_then(|idx| json_tape_step_index(tape, idx, second)),
2250 _ => json_tape_path_index_slow(tape, start, steps),
2251 };
2252}
2253
2254#[cfg(feature = "simd-json")]
2255fn json_tape_path_index_slow<T: JsonTape>(
2256 tape: &T,
2257 start: usize,
2258 steps: &[crate::ir::physical::PhysicalPathStep],
2259) -> Option<usize> {
2260 let mut idx = start;
2261 for step in steps {
2262 idx = json_tape_step_index(tape, idx, step)?;
2263 }
2264 Some(idx)
2265}
2266
2267#[cfg(feature = "simd-json")]
2268fn json_tape_step_index<T: JsonTape>(
2269 tape: &T,
2270 start: usize,
2271 step: &crate::ir::physical::PhysicalPathStep,
2272) -> Option<usize> {
2273 use crate::data::tape::TapeNode;
2274 use crate::ir::physical::PhysicalPathStep;
2275
2276 match step {
2277 PhysicalPathStep::Field(key) => {
2278 let TapeNode::Object { len, .. } = tape.nodes()[start] else {
2279 return None;
2280 };
2281 let mut cur = start + 1;
2282 for _ in 0..len {
2283 if tape.str_at(cur) == key.as_ref() {
2284 return Some(cur + 1);
2285 }
2286 cur += 1;
2287 cur += tape.span(cur);
2288 }
2289 None
2290 }
2291 PhysicalPathStep::Index(wanted) => {
2292 let TapeNode::Array { len, .. } = tape.nodes()[start] else {
2293 return None;
2294 };
2295 let wanted = if *wanted < 0 {
2296 len.checked_sub(wanted.unsigned_abs() as usize)?
2297 } else {
2298 *wanted as usize
2299 };
2300 if wanted >= len {
2301 return None;
2302 }
2303 let mut cur = start + 1;
2304 for _ in 0..wanted {
2305 cur += tape.span(cur);
2306 }
2307 Some(cur)
2308 }
2309 }
2310}
2311
2312#[cfg(feature = "simd-json")]
2313fn json_tape_object_cached_field<T: JsonTape>(
2314 tape: &T,
2315 obj_idx: usize,
2316 cache: NdjsonFieldCache,
2317 key: &str,
2318) -> Option<usize> {
2319 let crate::data::tape::TapeNode::Object { .. } = tape.nodes().get(obj_idx).copied()? else {
2320 return None;
2321 };
2322 let key_idx = obj_idx.checked_add(cache.key_delta)?;
2323 let value_idx = obj_idx.checked_add(cache.value_delta)?;
2324 if value_idx >= tape.nodes().len() {
2325 return None;
2326 }
2327 if !matches!(
2328 tape.nodes().get(key_idx),
2329 Some(crate::data::tape::TapeNode::String(_))
2330 ) {
2331 return None;
2332 }
2333 (tape.str_at(key_idx) == key).then_some(value_idx)
2334}
2335
2336#[cfg(feature = "simd-json")]
2337fn json_tape_object_field_index_and_cache<T: JsonTape>(
2338 tape: &T,
2339 obj_idx: usize,
2340 key: &str,
2341) -> Option<(usize, NdjsonFieldCache)> {
2342 let crate::data::tape::TapeNode::Object { len, .. } = tape.nodes()[obj_idx] else {
2343 return None;
2344 };
2345 let mut cur = obj_idx + 1;
2346 for _ in 0..len {
2347 if tape.str_at(cur) == key {
2348 return Some((
2349 cur + 1,
2350 NdjsonFieldCache {
2351 key_delta: cur - obj_idx,
2352 value_delta: cur + 1 - obj_idx,
2353 },
2354 ));
2355 }
2356 cur += 1;
2357 cur += tape.span(cur);
2358 }
2359 None
2360}
2361
2362#[cfg(feature = "simd-json")]
2363fn json_tape_array_element<T: JsonTape>(
2364 tape: &T,
2365 idx: usize,
2366 element: NdjsonDirectElement,
2367) -> Option<usize> {
2368 let crate::data::tape::TapeNode::Array { len, .. } = tape.nodes().get(idx).copied()? else {
2369 return None;
2370 };
2371 let wanted = match element {
2372 NdjsonDirectElement::First => 0,
2373 NdjsonDirectElement::Last => len.checked_sub(1)?,
2374 NdjsonDirectElement::Nth(n) => n,
2375 };
2376 if wanted >= len {
2377 return None;
2378 }
2379 let mut cur = idx + 1;
2380 for _ in 0..wanted {
2381 cur += tape.span(cur);
2382 }
2383 Some(cur)
2384}
2385
2386#[cfg(feature = "simd-json")]
2387pub(super) fn eval_tape_predicate(
2388 tape: &crate::data::tape::TapeScratch,
2389 predicate: &NdjsonDirectPredicate,
2390 env: Option<&crate::data::context::Env>,
2391 vm: &mut Option<std::sync::MutexGuard<'_, crate::vm::exec::VM>>,
2392 cache: &mut NdjsonPathCache,
2393) -> Result<bool, crate::EvalError> {
2394 use crate::parse::ast::BinOp;
2395
2396 Ok(match predicate {
2397 NdjsonDirectPredicate::Path(steps) => cache
2398 .index(tape, 0, steps)
2399 .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
2400 .unwrap_or(false),
2401 NdjsonDirectPredicate::Literal(value) => crate::util::is_truthy(value),
2402 NdjsonDirectPredicate::Not(inner) => !eval_tape_predicate(tape, inner, env, vm, cache)?,
2403 NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
2404 eval_tape_predicate(tape, lhs, env, vm, cache)?
2405 && eval_tape_predicate(tape, rhs, env, vm, cache)?
2406 }
2407 NdjsonDirectPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
2408 eval_tape_predicate(tape, lhs, env, vm, cache)?
2409 || eval_tape_predicate(tape, rhs, env, vm, cache)?
2410 }
2411 NdjsonDirectPredicate::Binary { lhs, op, rhs } => {
2412 let Some(lhs) = eval_tape_scalar(tape, lhs, cache) else {
2413 return Ok(false);
2414 };
2415 let Some(rhs) = eval_tape_scalar(tape, rhs, cache) else {
2416 return Ok(false);
2417 };
2418 crate::util::json_cmp_binop(lhs, *op, rhs)
2419 }
2420 NdjsonDirectPredicate::ViewScalarCall { steps, call } => cache
2421 .index(tape, 0, steps)
2422 .map(|idx| json_tape_scalar(tape, idx))
2423 .and_then(|value| call.try_apply_json_view(value))
2424 .is_some_and(|value| crate::util::is_truthy(&value)),
2425 NdjsonDirectPredicate::ArrayElementViewScalarCall {
2426 source_steps,
2427 element,
2428 suffix_steps,
2429 call,
2430 } => json_tape_path_index(tape, source_steps)
2431 .and_then(|idx| json_tape_array_element(tape, idx, *element))
2432 .and_then(|idx| json_tape_path_index_from(tape, idx, suffix_steps))
2433 .map(|idx| json_tape_scalar(tape, idx))
2434 .and_then(|value| call.try_apply_json_view(value))
2435 .is_some_and(|value| crate::util::is_truthy(&value)),
2436 NdjsonDirectPredicate::ViewPipeline { source_steps, body } => {
2437 let (Some(vm), Some(env)) = (vm.as_deref_mut(), env) else {
2438 return Err(crate::EvalError(
2439 "view pipeline predicate requires VM state".to_string(),
2440 ));
2441 };
2442 let source = json_tape_path_index(tape, source_steps)
2443 .map(|idx| crate::data::view::TapeScratchView::Node { tape, idx })
2444 .unwrap_or(crate::data::view::TapeScratchView::Missing);
2445 crate::exec::view::run_with_env_and_vm(source, body, None, env, vm)
2446 .transpose()?
2447 .is_some_and(|value| crate::util::is_truthy(&value))
2448 }
2449 })
2450}
2451
2452#[cfg(feature = "simd-json")]
2453pub(super) fn predicate_needs_vm(predicate: &NdjsonDirectPredicate) -> bool {
2454 match predicate {
2455 NdjsonDirectPredicate::Not(inner) => predicate_needs_vm(inner),
2456 NdjsonDirectPredicate::Binary { lhs, rhs, .. } => {
2457 predicate_needs_vm(lhs) || predicate_needs_vm(rhs)
2458 }
2459 NdjsonDirectPredicate::ViewPipeline { .. } => true,
2460 NdjsonDirectPredicate::Path(_)
2461 | NdjsonDirectPredicate::Literal(_)
2462 | NdjsonDirectPredicate::ViewScalarCall { .. }
2463 | NdjsonDirectPredicate::ArrayElementViewScalarCall { .. } => false,
2464 }
2465}
2466
2467#[cfg(feature = "simd-json")]
2468fn eval_tape_scalar<'a>(
2469 tape: &'a crate::data::tape::TapeScratch,
2470 predicate: &'a NdjsonDirectPredicate,
2471 cache: &mut NdjsonPathCache,
2472) -> Option<crate::util::JsonView<'a>> {
2473 match predicate {
2474 NdjsonDirectPredicate::Path(steps) => cache
2475 .index(tape, 0, steps)
2476 .map(|idx| json_tape_scalar(tape, idx)),
2477 NdjsonDirectPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
2478 _ => None,
2479 }
2480}
2481
2482#[cfg(feature = "simd-json")]
2483fn json_view_truthy(value: crate::util::JsonView<'_>) -> bool {
2484 match value {
2485 crate::util::JsonView::Null => false,
2486 crate::util::JsonView::Bool(value) => value,
2487 crate::util::JsonView::Int(value) => value != 0,
2488 crate::util::JsonView::UInt(value) => value != 0,
2489 crate::util::JsonView::Float(value) => value != 0.0,
2490 crate::util::JsonView::Str(value) => !value.is_empty(),
2491 crate::util::JsonView::ArrayLen(len) | crate::util::JsonView::ObjectLen(len) => len > 0,
2492 }
2493}
2494
2495#[cfg(feature = "simd-json")]
2496fn json_tape_scalar<T: JsonTape>(tape: &T, idx: usize) -> crate::util::JsonView<'_> {
2497 use crate::data::tape::TapeNode;
2498 use simd_json::StaticNode as SN;
2499
2500 let Some(node) = tape.nodes().get(idx).copied() else {
2501 return crate::util::JsonView::Null;
2502 };
2503 match node {
2504 TapeNode::Static(SN::Null) => crate::util::JsonView::Null,
2505 TapeNode::Static(SN::Bool(value)) => crate::util::JsonView::Bool(value),
2506 TapeNode::Static(SN::I64(value)) => crate::util::JsonView::Int(value),
2507 TapeNode::Static(SN::U64(value)) => crate::util::JsonView::UInt(value),
2508 TapeNode::Static(SN::F64(value)) => crate::util::JsonView::Float(value),
2509 TapeNode::String(_) => crate::util::JsonView::Str(tape.str_at(idx)),
2510 TapeNode::Array { len, .. } => crate::util::JsonView::ArrayLen(len),
2511 TapeNode::Object { len, .. } => crate::util::JsonView::ObjectLen(len),
2512 }
2513}
2514
2515pub(super) fn write_val_line<W: Write>(
2516 writer: &mut W,
2517 value: &Val,
2518) -> Result<(), JetroEngineError> {
2519 write_val_json(writer, value)?;
2520 writer.write_all(b"\n")?;
2521 Ok(())
2522}
2523
2524pub(super) fn write_val_line_with_options<W: Write>(
2525 writer: &mut W,
2526 value: &Val,
2527 options: NdjsonOptions,
2528) -> Result<bool, JetroEngineError> {
2529 if value == &Val::Null && options.null_output == NdjsonNullOutput::Skip {
2530 return Ok(false);
2531 }
2532 write_val_line(writer, value)?;
2533 Ok(true)
2534}
2535
2536pub(super) fn write_json_bytes_line_with_options<W: Write>(
2537 writer: &mut W,
2538 bytes: &[u8],
2539 options: NdjsonOptions,
2540) -> Result<bool, JetroEngineError> {
2541 if is_json_null_bytes(bytes) && options.null_output == NdjsonNullOutput::Skip {
2542 return Ok(false);
2543 }
2544 writer.write_all(bytes)?;
2545 writer.write_all(b"\n")?;
2546 Ok(true)
2547}
2548
2549fn is_json_null_bytes(bytes: &[u8]) -> bool {
2550 bytes == b"null"
2551}
2552
2553pub(super) fn write_document_line<W: Write>(
2554 writer: &mut W,
2555 document: &Jetro,
2556 line_no: u64,
2557 engine: &JetroEngine,
2558) -> Result<(), JetroEngineError> {
2559 if let Some(bytes) = document.raw_bytes() {
2560 writer.write_all(bytes)?;
2561 writer.write_all(b"\n")?;
2562 return Ok(());
2563 }
2564
2565 let root = document
2566 .root_val_with(engine.keys())
2567 .map_err(|err| row_eval_error(line_no, err))?;
2568 write_val_line(writer, &root)
2569}
2570
2571pub(super) fn ndjson_writer_with_options<W: Write>(
2572 writer: W,
2573 options: NdjsonOptions,
2574) -> BufWriter<W> {
2575 let capacity = options
2576 .reader_buffer_capacity
2577 .max(DEFAULT_READER_BUFFER_CAPACITY);
2578 BufWriter::with_capacity(capacity, writer)
2579}
2580
2581pub(super) fn write_val_json<W: Write>(
2582 writer: &mut W,
2583 value: &Val,
2584) -> Result<(), JetroEngineError> {
2585 match value {
2586 Val::Null => writer.write_all(b"null")?,
2587 Val::Bool(true) => writer.write_all(b"true")?,
2588 Val::Bool(false) => writer.write_all(b"false")?,
2589 Val::Int(n) => write_i64(writer, *n)?,
2590 Val::Float(n) => write_f64(writer, *n)?,
2591 Val::Str(s) => write_json_str(writer, s.as_ref())?,
2592 Val::StrSlice(s) => write_json_str(writer, s.as_str())?,
2593 Val::Arr(items) => write_json_array(writer, items.iter())?,
2594 Val::IntVec(items) => write_json_int_array(writer, items.iter().copied())?,
2595 Val::FloatVec(items) => write_json_float_array(writer, items.iter().copied())?,
2596 Val::StrVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_ref()))?,
2597 Val::StrSliceVec(items) => write_json_str_array(writer, items.iter().map(|s| s.as_str()))?,
2598 Val::Obj(entries) => write_json_object(
2599 writer,
2600 entries.iter().map(|(key, value)| (key.as_ref(), value)),
2601 )?,
2602 Val::ObjSmall(entries) => write_json_object(
2603 writer,
2604 entries.iter().map(|(key, value)| (key.as_ref(), value)),
2605 )?,
2606 Val::ObjVec(data) => write_json_objvec(writer, data)?,
2607 }
2608 Ok(())
2609}
2610
2611#[cfg(feature = "simd-json")]
2612fn write_json_tape_at<W: Write, T: JsonTape>(
2613 writer: &mut W,
2614 tape: &T,
2615 idx: usize,
2616) -> Result<usize, JetroEngineError> {
2617 use crate::data::tape::TapeNode;
2618 use simd_json::StaticNode as SN;
2619
2620 let Some(node) = tape.nodes().get(idx).copied() else {
2621 writer.write_all(b"null")?;
2622 return Ok(idx);
2623 };
2624
2625 match node {
2626 TapeNode::Static(SN::Null) => {
2627 writer.write_all(b"null")?;
2628 Ok(idx + 1)
2629 }
2630 TapeNode::Static(SN::Bool(true)) => {
2631 writer.write_all(b"true")?;
2632 Ok(idx + 1)
2633 }
2634 TapeNode::Static(SN::Bool(false)) => {
2635 writer.write_all(b"false")?;
2636 Ok(idx + 1)
2637 }
2638 TapeNode::Static(SN::I64(value)) => {
2639 write_i64(writer, value)?;
2640 Ok(idx + 1)
2641 }
2642 TapeNode::Static(SN::U64(value)) => {
2643 write_u64(writer, value)?;
2644 Ok(idx + 1)
2645 }
2646 TapeNode::Static(SN::F64(value)) => {
2647 write_f64(writer, value)?;
2648 Ok(idx + 1)
2649 }
2650 TapeNode::String(_) => {
2651 write_json_str(writer, tape.str_at(idx))?;
2652 Ok(idx + 1)
2653 }
2654 TapeNode::Array { len, .. } => {
2655 writer.write_all(b"[")?;
2656 let mut cur = idx + 1;
2657 for item_idx in 0..len {
2658 if item_idx > 0 {
2659 writer.write_all(b",")?;
2660 }
2661 cur = write_json_tape_at(writer, tape, cur)?;
2662 }
2663 writer.write_all(b"]")?;
2664 Ok(cur)
2665 }
2666 TapeNode::Object { len, .. } => {
2667 writer.write_all(b"{")?;
2668 let mut cur = idx + 1;
2669 for field_idx in 0..len {
2670 if field_idx > 0 {
2671 writer.write_all(b",")?;
2672 }
2673 write_json_str(writer, tape.str_at(cur))?;
2674 writer.write_all(b":")?;
2675 cur = write_json_tape_at(writer, tape, cur + 1)?;
2676 }
2677 writer.write_all(b"}")?;
2678 Ok(cur)
2679 }
2680 }
2681}
2682
2683#[cfg(feature = "simd-json")]
2684fn visit_json_tape_source_items<T, E, F>(tape: &T, source_idx: usize, mut visit: F) -> Result<(), E>
2685where
2686 T: JsonTape,
2687 F: FnMut(usize) -> Result<(), E>,
2688{
2689 use crate::data::tape::TapeNode;
2690
2691 match tape.nodes().get(source_idx).copied() {
2692 Some(TapeNode::Array { len, .. }) => {
2693 let mut cur = source_idx + 1;
2694 for _ in 0..len {
2695 visit(cur)?;
2696 cur += tape.span(cur);
2697 }
2698 }
2699 Some(_) => visit(source_idx)?,
2700 None => {}
2701 }
2702 Ok(())
2703}
2704
2705#[cfg(feature = "simd-json")]
2706fn find_json_tape_source_item<T, F>(tape: &T, source_idx: usize, mut matches: F) -> Option<usize>
2707where
2708 T: JsonTape,
2709 F: FnMut(usize) -> bool,
2710{
2711 use crate::data::tape::TapeNode;
2712
2713 match tape.nodes().get(source_idx).copied()? {
2714 TapeNode::Array { len, .. } => {
2715 let mut cur = source_idx + 1;
2716 for _ in 0..len {
2717 if matches(cur) {
2718 return Some(cur);
2719 }
2720 cur += tape.span(cur);
2721 }
2722 None
2723 }
2724 _ => matches(source_idx).then_some(source_idx),
2725 }
2726}
2727
2728#[cfg(feature = "simd-json")]
2729fn write_json_tape_stream<W: Write, T: JsonTape>(
2730 writer: &mut W,
2731 tape: &T,
2732 plan: &NdjsonDirectStreamPlan,
2733 source_cache: &mut NdjsonPathCache,
2734 suffix_cache: &mut NdjsonPathCache,
2735 predicate_cache: &mut NdjsonPathCache,
2736 projection_caches: &mut Vec<NdjsonPathCache>,
2737) -> Result<(), JetroEngineError> {
2738 let Some(source_idx) = source_cache.index(tape, 0, &plan.source_steps) else {
2739 write_json_tape_empty_stream_result(writer, &plan.sink)?;
2740 return Ok(());
2741 };
2742
2743 match &plan.sink {
2744 NdjsonDirectStreamSink::Collect(map) => {
2745 writer.write_all(b"[")?;
2746 let mut wrote_row = false;
2747 visit_json_tape_source_items(tape, source_idx, |item_idx| {
2748 if !plan.predicate.as_ref().is_none_or(|predicate| {
2749 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2750 }) {
2751 return Ok::<(), JetroEngineError>(());
2752 }
2753 if wrote_row {
2754 writer.write_all(b",")?;
2755 }
2756 write_json_tape_stream_map(
2757 writer,
2758 tape,
2759 item_idx,
2760 map,
2761 suffix_cache,
2762 projection_caches,
2763 )?;
2764 wrote_row = true;
2765 Ok(())
2766 })?;
2767 writer.write_all(b"]")?;
2768 }
2769 NdjsonDirectStreamSink::Count => {
2770 let mut count = 0usize;
2771 let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
2772 if plan.predicate.as_ref().is_none_or(|predicate| {
2773 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2774 }) {
2775 count += 1;
2776 }
2777 Ok(())
2778 });
2779 write_i64(writer, count as i64)?;
2780 }
2781 NdjsonDirectStreamSink::First(map) => {
2782 let selected = find_json_tape_source_item(tape, source_idx, |item_idx| {
2783 plan.predicate.as_ref().is_none_or(|predicate| {
2784 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2785 })
2786 });
2787 if let Some(item_idx) = selected {
2788 write_json_tape_stream_map(
2789 writer,
2790 tape,
2791 item_idx,
2792 map,
2793 suffix_cache,
2794 projection_caches,
2795 )?;
2796 } else {
2797 writer.write_all(b"null")?;
2798 }
2799 }
2800 NdjsonDirectStreamSink::Last(map) => {
2801 let mut selected = None;
2802 visit_json_tape_source_items(tape, source_idx, |item_idx| {
2803 if plan.predicate.as_ref().is_none_or(|predicate| {
2804 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
2805 }) {
2806 selected = Some(item_idx);
2807 }
2808 Ok::<(), JetroEngineError>(())
2809 })?;
2810 if let Some(item_idx) = selected {
2811 write_json_tape_stream_map(
2812 writer,
2813 tape,
2814 item_idx,
2815 map,
2816 suffix_cache,
2817 projection_caches,
2818 )?;
2819 } else {
2820 writer.write_all(b"null")?;
2821 }
2822 }
2823 NdjsonDirectStreamSink::Numeric { suffix_steps, op } => {
2824 let caches = NdjsonPathCaches {
2825 source: source_cache,
2826 suffix: suffix_cache,
2827 predicate: predicate_cache,
2828 };
2829 let value = reduce_json_tape_numeric_path(
2830 tape,
2831 &plan.source_steps,
2832 plan.predicate.as_ref(),
2833 suffix_steps,
2834 *op,
2835 caches,
2836 );
2837 write_val_json(writer, &value)?;
2838 }
2839 NdjsonDirectStreamSink::Extreme {
2840 key_steps,
2841 want_max,
2842 value,
2843 } => {
2844 let mut best_idx = None;
2845 visit_json_tape_source_items(tape, source_idx, |item_idx| {
2846 let Some(key_idx) = suffix_cache.index(tape, item_idx, key_steps) else {
2847 return Ok::<(), JetroEngineError>(());
2848 };
2849 let key = json_tape_scalar(tape, key_idx);
2850 let replace = best_idx
2851 .and_then(|idx| suffix_cache.index(tape, idx, key_steps))
2852 .map(|idx| {
2853 let order = crate::util::json_cmp_vals(key, json_tape_scalar(tape, idx));
2854 (*want_max && order.is_gt()) || (!*want_max && order.is_lt())
2855 })
2856 .unwrap_or(true);
2857 if replace {
2858 best_idx = Some(item_idx);
2859 }
2860 Ok(())
2861 })?;
2862 if let Some(item_idx) = best_idx {
2863 let path_idx = match value {
2864 NdjsonDirectProjectionValue::Path(steps)
2865 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2866 suffix_cache.index(tape, item_idx, steps)
2867 }
2868 NdjsonDirectProjectionValue::Nested(_)
2869 | NdjsonDirectProjectionValue::Literal(_) => None,
2870 };
2871 write_json_tape_direct_value(writer, tape, value, path_idx)?;
2872 } else {
2873 writer.write_all(b"null")?;
2874 }
2875 }
2876 }
2877
2878 Ok(())
2879}
2880
2881#[cfg(feature = "simd-json")]
2882fn write_json_tape_empty_stream_result<W: Write>(
2883 writer: &mut W,
2884 sink: &NdjsonDirectStreamSink,
2885) -> Result<(), JetroEngineError> {
2886 match sink {
2887 NdjsonDirectStreamSink::Collect(_) => writer.write_all(b"[]")?,
2888 NdjsonDirectStreamSink::First(_) | NdjsonDirectStreamSink::Last(_) => {
2889 writer.write_all(b"null")?
2890 }
2891 NdjsonDirectStreamSink::Count => writer.write_all(b"0")?,
2892 NdjsonDirectStreamSink::Numeric { op, .. } => {
2893 let value = crate::exec::pipeline::num_finalise(
2894 *op,
2895 0,
2896 0.0,
2897 false,
2898 f64::INFINITY,
2899 f64::NEG_INFINITY,
2900 0,
2901 );
2902 write_val_json(writer, &value)?;
2903 }
2904 NdjsonDirectStreamSink::Extreme { .. } => writer.write_all(b"null")?,
2905 }
2906 Ok(())
2907}
2908
2909#[cfg(feature = "simd-json")]
2910fn write_json_tape_stream_map<W: Write, T: JsonTape>(
2911 writer: &mut W,
2912 tape: &T,
2913 item_idx: usize,
2914 map: &NdjsonDirectStreamMap,
2915 suffix_cache: &mut NdjsonPathCache,
2916 projection_caches: &mut Vec<NdjsonPathCache>,
2917) -> Result<(), JetroEngineError> {
2918 match map {
2919 NdjsonDirectStreamMap::Value(value) => {
2920 let path_idx = match value {
2921 NdjsonDirectProjectionValue::Path(steps)
2922 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
2923 suffix_cache.index(tape, item_idx, steps)
2924 }
2925 NdjsonDirectProjectionValue::Nested(_) => None,
2926 NdjsonDirectProjectionValue::Literal(_) => None,
2927 };
2928 write_json_tape_direct_value(writer, tape, value, path_idx)?;
2929 }
2930 NdjsonDirectStreamMap::Array(items) => {
2931 write_json_tape_array_projection_from(
2932 writer,
2933 tape,
2934 item_idx,
2935 items,
2936 projection_caches,
2937 )?;
2938 }
2939 NdjsonDirectStreamMap::Object(fields) => {
2940 write_json_tape_object_projection_from(
2941 writer,
2942 tape,
2943 item_idx,
2944 fields,
2945 projection_caches,
2946 )?;
2947 }
2948 }
2949 Ok(())
2950}
2951
2952#[cfg(feature = "simd-json")]
2953fn write_json_tape_object_projection<W: Write, T: JsonTape>(
2954 writer: &mut W,
2955 tape: &T,
2956 fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2957 path_caches: &mut Vec<NdjsonPathCache>,
2958) -> Result<(), JetroEngineError> {
2959 write_json_tape_object_projection_from(writer, tape, 0, fields, path_caches)
2960}
2961
2962#[cfg(feature = "simd-json")]
2963fn write_json_tape_object_projection_from<W: Write, T: JsonTape>(
2964 writer: &mut W,
2965 tape: &T,
2966 start: usize,
2967 fields: &[super::ndjson_direct::NdjsonDirectObjectField],
2968 path_caches: &mut Vec<NdjsonPathCache>,
2969) -> Result<(), JetroEngineError> {
2970 if path_caches.len() < fields.len() {
2971 path_caches.resize_with(fields.len(), NdjsonPathCache::default);
2972 }
2973 writer.write_all(b"{")?;
2974 let mut wrote = false;
2975 for (field_idx, field) in fields.iter().enumerate() {
2976 let path_cache = &mut path_caches[field_idx];
2977 let mut path_idx = None;
2978 match &field.value {
2979 NdjsonDirectProjectionValue::Path(steps) => {
2980 let idx = path_cache.index(tape, start, steps);
2981 path_idx = idx;
2982 if field.optional
2983 && idx
2984 .map(|idx| {
2985 matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
2986 })
2987 .unwrap_or(true)
2988 {
2989 continue;
2990 }
2991 }
2992 NdjsonDirectProjectionValue::ViewScalarCall {
2993 steps,
2994 call,
2995 optional,
2996 } => {
2997 let idx = path_cache.index(tape, start, steps);
2998 path_idx = idx;
2999 if (*optional || field.optional)
3000 && idx
3001 .map(|idx| {
3002 matches!(json_tape_scalar(tape, idx), crate::util::JsonView::Null)
3003 })
3004 .unwrap_or(true)
3005 {
3006 continue;
3007 }
3008 if field.optional
3009 && idx
3010 .map(|idx| json_tape_scalar(tape, idx))
3011 .and_then(|value| call.try_apply_json_view(value))
3012 .is_some_and(|value| matches!(value, Val::Null))
3013 {
3014 continue;
3015 }
3016 }
3017 NdjsonDirectProjectionValue::Literal(Val::Null) if field.optional => {
3018 continue;
3019 }
3020 NdjsonDirectProjectionValue::Nested(_) => {}
3021 NdjsonDirectProjectionValue::Literal(_) => {}
3022 }
3023 if wrote {
3024 writer.write_all(b",")?;
3025 }
3026 write_json_str(writer, field.key.as_ref())?;
3027 writer.write_all(b":")?;
3028 write_json_tape_direct_value(writer, tape, &field.value, path_idx)?;
3029 wrote = true;
3030 }
3031 writer.write_all(b"}")?;
3032 Ok(())
3033}
3034
3035#[cfg(feature = "simd-json")]
3036fn write_json_tape_array_projection<W: Write, T: JsonTape>(
3037 writer: &mut W,
3038 tape: &T,
3039 items: &[NdjsonDirectProjectionValue],
3040 path_caches: &mut Vec<NdjsonPathCache>,
3041) -> Result<(), JetroEngineError> {
3042 write_json_tape_array_projection_from(writer, tape, 0, items, path_caches)
3043}
3044
3045#[cfg(feature = "simd-json")]
3046fn write_json_tape_array_projection_from<W: Write, T: JsonTape>(
3047 writer: &mut W,
3048 tape: &T,
3049 start: usize,
3050 items: &[NdjsonDirectProjectionValue],
3051 path_caches: &mut Vec<NdjsonPathCache>,
3052) -> Result<(), JetroEngineError> {
3053 if path_caches.len() < items.len() {
3054 path_caches.resize_with(items.len(), NdjsonPathCache::default);
3055 }
3056 writer.write_all(b"[")?;
3057 for (idx, item) in items.iter().enumerate() {
3058 if idx > 0 {
3059 writer.write_all(b",")?;
3060 }
3061 let path_idx = match item {
3062 NdjsonDirectProjectionValue::Path(steps)
3063 | NdjsonDirectProjectionValue::ViewScalarCall { steps, .. } => {
3064 path_caches[idx].index(tape, start, steps)
3065 }
3066 NdjsonDirectProjectionValue::Nested(_) => None,
3067 NdjsonDirectProjectionValue::Literal(_) => None,
3068 };
3069 write_json_tape_direct_value(writer, tape, item, path_idx)?;
3070 }
3071 writer.write_all(b"]")?;
3072 Ok(())
3073}
3074
3075#[cfg(feature = "simd-json")]
3076fn write_json_tape_direct_value<W: Write, T: JsonTape>(
3077 writer: &mut W,
3078 tape: &T,
3079 value: &NdjsonDirectProjectionValue,
3080 path_idx: Option<usize>,
3081) -> Result<(), JetroEngineError> {
3082 match value {
3083 NdjsonDirectProjectionValue::Path(_) => {
3084 if let Some(idx) = path_idx {
3085 write_json_tape_at(writer, tape, idx)?;
3086 } else {
3087 writer.write_all(b"null")?;
3088 }
3089 }
3090 NdjsonDirectProjectionValue::ViewScalarCall { call, .. } => {
3091 if let Some(idx) = path_idx {
3092 let value = json_tape_scalar(tape, idx);
3093 if let Some(value) = call.try_apply_json_view(value) {
3094 write_val_json(writer, &value)?;
3095 } else {
3096 write_json_tape_at(writer, tape, idx)?;
3097 }
3098 } else {
3099 writer.write_all(b"null")?;
3100 }
3101 }
3102 NdjsonDirectProjectionValue::Literal(value) => write_val_json(writer, value)?,
3103 NdjsonDirectProjectionValue::Nested(plan) => {
3104 write_json_tape_nested_plan(writer, tape, plan)?;
3105 }
3106 }
3107 Ok(())
3108}
3109
3110#[cfg(feature = "simd-json")]
3111fn write_json_tape_nested_plan<W: Write, T: JsonTape>(
3112 writer: &mut W,
3113 tape: &T,
3114 plan: &NdjsonDirectTapePlan,
3115) -> Result<(), JetroEngineError> {
3116 let mut root_cache = NdjsonPathCache::default();
3117 let mut source_cache = NdjsonPathCache::default();
3118 let mut suffix_cache = NdjsonPathCache::default();
3119 let mut predicate_cache = NdjsonPathCache::default();
3120 let mut projection_caches = Vec::new();
3121 match plan {
3122 NdjsonDirectTapePlan::RootPath(steps) => {
3123 if let Some(idx) = root_cache.index(tape, 0, steps) {
3124 write_json_tape_at(writer, tape, idx)?;
3125 } else {
3126 writer.write_all(b"null")?;
3127 }
3128 }
3129 NdjsonDirectTapePlan::ViewScalarCall {
3130 steps,
3131 call,
3132 optional,
3133 } => {
3134 let idx = root_cache.index(tape, 0, steps);
3135 let value = idx
3136 .map(|idx| json_tape_scalar(tape, idx))
3137 .unwrap_or(crate::util::JsonView::Null);
3138 if *optional && matches!(value, crate::util::JsonView::Null) {
3139 writer.write_all(b"null")?;
3140 } else if let Some(value) = call.try_apply_json_view(value) {
3141 write_val_json(writer, &value)?;
3142 } else if let Some(idx) = idx {
3143 write_json_tape_at(writer, tape, idx)?;
3144 } else {
3145 writer.write_all(b"null")?;
3146 }
3147 }
3148 NdjsonDirectTapePlan::ArrayElementPath {
3149 source_steps,
3150 element,
3151 suffix_steps,
3152 } => {
3153 write_json_tape_array_element_path(
3154 writer,
3155 tape,
3156 source_steps,
3157 *element,
3158 suffix_steps,
3159 &mut source_cache,
3160 &mut suffix_cache,
3161 )?;
3162 }
3163 NdjsonDirectTapePlan::ArrayElementViewScalarCall {
3164 source_steps,
3165 element,
3166 suffix_steps,
3167 call,
3168 } => {
3169 write_json_tape_array_element_scalar(
3170 writer,
3171 tape,
3172 source_steps,
3173 *element,
3174 suffix_steps,
3175 call,
3176 &mut source_cache,
3177 &mut suffix_cache,
3178 )?;
3179 }
3180 NdjsonDirectTapePlan::Stream(stream) => {
3181 write_json_tape_stream(
3182 writer,
3183 tape,
3184 stream,
3185 &mut source_cache,
3186 &mut suffix_cache,
3187 &mut predicate_cache,
3188 &mut projection_caches,
3189 )?;
3190 }
3191 NdjsonDirectTapePlan::Object(fields) => {
3192 write_json_tape_object_projection(writer, tape, fields, &mut projection_caches)?;
3193 }
3194 NdjsonDirectTapePlan::Array(items) => {
3195 write_json_tape_array_projection(writer, tape, items, &mut projection_caches)?;
3196 }
3197 NdjsonDirectTapePlan::ObjectItems { steps, method } => {
3198 let idx = root_cache.index(tape, 0, steps);
3199 write_json_tape_object_items(writer, tape, idx, *method)?;
3200 }
3201 NdjsonDirectTapePlan::ViewPipeline { .. } => {
3202 writer.write_all(b"null")?;
3203 }
3204 }
3205 Ok(())
3206}
3207
3208#[cfg(feature = "simd-json")]
3209fn write_json_tape_array_element_path<W: Write, T: JsonTape>(
3210 writer: &mut W,
3211 tape: &T,
3212 source_steps: &[crate::ir::physical::PhysicalPathStep],
3213 element: super::ndjson_direct::NdjsonDirectElement,
3214 suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3215 source_cache: &mut NdjsonPathCache,
3216 suffix_cache: &mut NdjsonPathCache,
3217) -> Result<(), JetroEngineError> {
3218 let idx = source_cache
3219 .index(tape, 0, source_steps)
3220 .and_then(|idx| json_tape_array_element(tape, idx, element))
3221 .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3222 if let Some(idx) = idx {
3223 write_json_tape_at(writer, tape, idx)?;
3224 } else {
3225 writer.write_all(b"null")?;
3226 }
3227 Ok(())
3228}
3229
3230#[cfg(feature = "simd-json")]
3231fn write_json_tape_array_element_scalar<W: Write, T: JsonTape>(
3232 writer: &mut W,
3233 tape: &T,
3234 source_steps: &[crate::ir::physical::PhysicalPathStep],
3235 element: super::ndjson_direct::NdjsonDirectElement,
3236 suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3237 call: &crate::builtins::BuiltinCall,
3238 source_cache: &mut NdjsonPathCache,
3239 suffix_cache: &mut NdjsonPathCache,
3240) -> Result<(), JetroEngineError> {
3241 let idx = source_cache
3242 .index(tape, 0, source_steps)
3243 .and_then(|idx| json_tape_array_element(tape, idx, element))
3244 .and_then(|idx| suffix_cache.index(tape, idx, suffix_steps));
3245 if let Some(value) = idx
3246 .map(|idx| json_tape_scalar(tape, idx))
3247 .and_then(|value| call.try_apply_json_view(value))
3248 {
3249 write_val_json(writer, &value)?;
3250 } else if let Some(idx) = idx {
3251 write_json_tape_at(writer, tape, idx)?;
3252 } else {
3253 writer.write_all(b"null")?;
3254 }
3255 Ok(())
3256}
3257
3258#[cfg(feature = "simd-json")]
3259fn write_json_tape_object_items<W: Write, T: JsonTape>(
3260 writer: &mut W,
3261 tape: &T,
3262 obj_idx: Option<usize>,
3263 method: crate::builtins::BuiltinMethod,
3264) -> Result<(), JetroEngineError> {
3265 let Some(obj_idx) = obj_idx else {
3266 writer.write_all(b"[]")?;
3267 return Ok(());
3268 };
3269 let Some(crate::data::tape::TapeNode::Object { len, .. }) = tape.nodes().get(obj_idx).copied()
3270 else {
3271 writer.write_all(b"[]")?;
3272 return Ok(());
3273 };
3274
3275 writer.write_all(b"[")?;
3276 let mut cur = obj_idx + 1;
3277 for field_idx in 0..len {
3278 if field_idx > 0 {
3279 writer.write_all(b",")?;
3280 }
3281 match method {
3282 crate::builtins::BuiltinMethod::Keys => {
3283 write_json_str(writer, tape.str_at(cur))?;
3284 cur += 1;
3285 cur += tape.span(cur);
3286 }
3287 crate::builtins::BuiltinMethod::Values => {
3288 cur = write_json_tape_at(writer, tape, cur + 1)?;
3289 }
3290 crate::builtins::BuiltinMethod::Entries => {
3291 writer.write_all(b"[")?;
3292 write_json_str(writer, tape.str_at(cur))?;
3293 writer.write_all(b",")?;
3294 cur = write_json_tape_at(writer, tape, cur + 1)?;
3295 writer.write_all(b"]")?;
3296 }
3297 _ => unreachable!("non-object-items builtin"),
3298 }
3299 }
3300 writer.write_all(b"]")?;
3301 Ok(())
3302}
3303
3304#[cfg(feature = "simd-json")]
3305fn reduce_json_tape_numeric_path<T: JsonTape>(
3306 tape: &T,
3307 source_steps: &[crate::ir::physical::PhysicalPathStep],
3308 predicate: Option<&NdjsonDirectItemPredicate>,
3309 suffix_steps: &[crate::ir::physical::PhysicalPathStep],
3310 op: crate::exec::pipeline::NumOp,
3311 caches: NdjsonPathCaches<'_>,
3312) -> Val {
3313 let mut acc_i = 0i64;
3314 let mut acc_f = 0.0f64;
3315 let mut floated = false;
3316 let mut min_f = f64::INFINITY;
3317 let mut max_f = f64::NEG_INFINITY;
3318 let mut n_obs = 0usize;
3319
3320 let Some(source_idx) = caches.source.index(tape, 0, source_steps) else {
3321 return crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs);
3322 };
3323
3324 let suffix_cache = caches.suffix;
3325 let predicate_cache = caches.predicate;
3326 let _: Result<(), ()> = visit_json_tape_source_items(tape, source_idx, |item_idx| {
3327 if !predicate.is_none_or(|predicate| {
3328 eval_json_tape_item_predicate_cached(tape, item_idx, predicate, predicate_cache)
3329 }) {
3330 return Ok(());
3331 }
3332 if let Some(idx) = suffix_cache.index(tape, item_idx, suffix_steps) {
3333 fold_json_tape_numeric(
3334 json_tape_scalar(tape, idx),
3335 op,
3336 &mut acc_i,
3337 &mut acc_f,
3338 &mut floated,
3339 &mut min_f,
3340 &mut max_f,
3341 &mut n_obs,
3342 );
3343 }
3344 Ok(())
3345 });
3346
3347 crate::exec::pipeline::num_finalise(op, acc_i, acc_f, floated, min_f, max_f, n_obs)
3348}
3349
3350#[cfg(feature = "simd-json")]
3351#[allow(clippy::too_many_arguments)]
3352fn fold_json_tape_numeric(
3353 value: crate::util::JsonView<'_>,
3354 op: crate::exec::pipeline::NumOp,
3355 acc_i: &mut i64,
3356 acc_f: &mut f64,
3357 floated: &mut bool,
3358 min_f: &mut f64,
3359 max_f: &mut f64,
3360 n_obs: &mut usize,
3361) {
3362 match value {
3363 crate::util::JsonView::Int(value) => crate::exec::pipeline::num_fold_i64(
3364 acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3365 ),
3366 crate::util::JsonView::UInt(value) if value <= i64::MAX as u64 => {
3367 crate::exec::pipeline::num_fold_i64(
3368 acc_i,
3369 acc_f,
3370 floated,
3371 min_f,
3372 max_f,
3373 n_obs,
3374 op,
3375 value as i64,
3376 )
3377 }
3378 crate::util::JsonView::UInt(value) => crate::exec::pipeline::num_fold_f64(
3379 acc_i,
3380 acc_f,
3381 floated,
3382 min_f,
3383 max_f,
3384 n_obs,
3385 op,
3386 value as f64,
3387 ),
3388 crate::util::JsonView::Float(value) => crate::exec::pipeline::num_fold_f64(
3389 acc_i, acc_f, floated, min_f, max_f, n_obs, op, value,
3390 ),
3391 _ => {}
3392 }
3393}
3394
3395#[cfg(feature = "simd-json")]
3396fn eval_json_tape_item_predicate_cached<T: JsonTape>(
3397 tape: &T,
3398 item_idx: usize,
3399 predicate: &NdjsonDirectItemPredicate,
3400 cache: &mut NdjsonPathCache,
3401) -> bool {
3402 use crate::parse::ast::BinOp;
3403
3404 match predicate {
3405 NdjsonDirectItemPredicate::Path(steps) => cache
3406 .index(tape, item_idx, steps)
3407 .map(|idx| json_view_truthy(json_tape_scalar(tape, idx)))
3408 .unwrap_or(false),
3409 NdjsonDirectItemPredicate::Literal(value) => crate::util::is_truthy(value),
3410 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::And => {
3411 eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3412 && eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3413 }
3414 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } if *op == BinOp::Or => {
3415 eval_json_tape_item_predicate_cached(tape, item_idx, lhs, cache)
3416 || eval_json_tape_item_predicate_cached(tape, item_idx, rhs, cache)
3417 }
3418 NdjsonDirectItemPredicate::Binary { lhs, op, rhs } => {
3419 let Some(lhs) = eval_json_tape_item_scalar_cached(tape, item_idx, lhs, cache) else {
3420 return false;
3421 };
3422 let Some(rhs) = eval_json_tape_item_scalar_cached(tape, item_idx, rhs, cache) else {
3423 return false;
3424 };
3425 crate::util::json_cmp_binop(lhs, *op, rhs)
3426 }
3427 NdjsonDirectItemPredicate::CmpLit { lhs, op, lit } => cache
3428 .index(tape, item_idx, lhs)
3429 .map(|idx| json_tape_scalar(tape, idx))
3430 .is_some_and(|value| {
3431 crate::util::json_cmp_binop(value, *op, crate::util::JsonView::from_val(lit))
3432 }),
3433 NdjsonDirectItemPredicate::ViewScalarCall { suffix_steps, call } => cache
3434 .index(tape, item_idx, suffix_steps)
3435 .map(|idx| json_tape_scalar(tape, idx))
3436 .and_then(|value| call.try_apply_json_view(value))
3437 .is_some_and(|value| crate::util::is_truthy(&value)),
3438 }
3439}
3440
3441#[cfg(feature = "simd-json")]
3442fn eval_json_tape_item_scalar_cached<'a, T: JsonTape>(
3443 tape: &'a T,
3444 item_idx: usize,
3445 predicate: &'a NdjsonDirectItemPredicate,
3446 cache: &mut NdjsonPathCache,
3447) -> Option<crate::util::JsonView<'a>> {
3448 match predicate {
3449 NdjsonDirectItemPredicate::Path(steps) => cache
3450 .index(tape, item_idx, steps)
3451 .map(|idx| json_tape_scalar(tape, idx)),
3452 NdjsonDirectItemPredicate::Literal(value) => Some(crate::util::JsonView::from_val(value)),
3453 _ => None,
3454 }
3455}
3456
3457fn write_json_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3458where
3459 W: Write,
3460 I: IntoIterator<Item = &'a Val>,
3461{
3462 writer.write_all(b"[")?;
3463 let mut first = true;
3464 for item in items {
3465 if first {
3466 first = false;
3467 } else {
3468 writer.write_all(b",")?;
3469 }
3470 write_val_json(writer, item)?;
3471 }
3472 writer.write_all(b"]")?;
3473 Ok(())
3474}
3475
3476fn write_json_int_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3477where
3478 W: Write,
3479 I: IntoIterator<Item = i64>,
3480{
3481 writer.write_all(b"[")?;
3482 let mut first = true;
3483 let mut buf = itoa::Buffer::new();
3484 for item in items {
3485 if first {
3486 first = false;
3487 } else {
3488 writer.write_all(b",")?;
3489 }
3490 writer.write_all(buf.format(item).as_bytes())?;
3491 }
3492 writer.write_all(b"]")?;
3493 Ok(())
3494}
3495
3496fn write_json_float_array<W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3497where
3498 W: Write,
3499 I: IntoIterator<Item = f64>,
3500{
3501 writer.write_all(b"[")?;
3502 let mut first = true;
3503 let mut buf = ryu::Buffer::new();
3504 for item in items {
3505 if first {
3506 first = false;
3507 } else {
3508 writer.write_all(b",")?;
3509 }
3510 if item.is_finite() {
3511 writer.write_all(buf.format(item).as_bytes())?;
3512 } else {
3513 writer.write_all(b"0")?;
3514 }
3515 }
3516 writer.write_all(b"]")?;
3517 Ok(())
3518}
3519
3520fn write_json_str_array<'a, W, I>(writer: &mut W, items: I) -> Result<(), JetroEngineError>
3521where
3522 W: Write,
3523 I: IntoIterator<Item = &'a str>,
3524{
3525 writer.write_all(b"[")?;
3526 let mut first = true;
3527 for item in items {
3528 if first {
3529 first = false;
3530 } else {
3531 writer.write_all(b",")?;
3532 }
3533 write_json_str(writer, item)?;
3534 }
3535 writer.write_all(b"]")?;
3536 Ok(())
3537}
3538
3539fn write_json_object<'a, W, I>(writer: &mut W, entries: I) -> Result<(), JetroEngineError>
3540where
3541 W: Write,
3542 I: IntoIterator<Item = (&'a str, &'a Val)>,
3543{
3544 writer.write_all(b"{")?;
3545 let mut first = true;
3546 for (key, value) in entries {
3547 if first {
3548 first = false;
3549 } else {
3550 writer.write_all(b",")?;
3551 }
3552 write_json_str(writer, key)?;
3553 writer.write_all(b":")?;
3554 write_val_json(writer, value)?;
3555 }
3556 writer.write_all(b"}")?;
3557 Ok(())
3558}
3559
3560fn write_json_objvec<W: Write>(
3561 writer: &mut W,
3562 data: &crate::data::value::ObjVecData,
3563) -> Result<(), JetroEngineError> {
3564 writer.write_all(b"[")?;
3565 for row in 0..data.nrows() {
3566 if row > 0 {
3567 writer.write_all(b",")?;
3568 }
3569 writer.write_all(b"{")?;
3570 for slot in 0..data.stride() {
3571 if slot > 0 {
3572 writer.write_all(b",")?;
3573 }
3574 write_json_str(writer, data.keys[slot].as_ref())?;
3575 writer.write_all(b":")?;
3576 write_val_json(writer, data.cell(row, slot))?;
3577 }
3578 writer.write_all(b"}")?;
3579 }
3580 writer.write_all(b"]")?;
3581 Ok(())
3582}
3583
3584pub(super) fn write_json_str<W: Write>(
3585 writer: &mut W,
3586 value: &str,
3587) -> Result<(), JetroEngineError> {
3588 writer.write_all(b"\"")?;
3589 let bytes = value.as_bytes();
3590 if !needs_json_escape(bytes) {
3591 writer.write_all(bytes)?;
3592 writer.write_all(b"\"")?;
3593 return Ok(());
3594 }
3595
3596 let mut start = 0usize;
3597
3598 for (idx, &byte) in bytes.iter().enumerate() {
3599 let escaped = match byte {
3600 b'"' => Some(br#"\""#.as_slice()),
3601 b'\\' => Some(br#"\\"#.as_slice()),
3602 b'\n' => Some(br#"\n"#.as_slice()),
3603 b'\r' => Some(br#"\r"#.as_slice()),
3604 b'\t' => Some(br#"\t"#.as_slice()),
3605 0x08 => Some(br#"\b"#.as_slice()),
3606 0x0c => Some(br#"\f"#.as_slice()),
3607 0x00..=0x1f => None,
3608 _ => continue,
3609 };
3610
3611 if start < idx {
3612 writer.write_all(&bytes[start..idx])?;
3613 }
3614 match escaped {
3615 Some(seq) => writer.write_all(seq)?,
3616 None => write_control_escape(writer, byte)?,
3617 }
3618 start = idx + 1;
3619 }
3620
3621 if start < bytes.len() {
3622 writer.write_all(&bytes[start..])?;
3623 }
3624 writer.write_all(b"\"")?;
3625 Ok(())
3626}
3627
3628#[inline]
3629pub(super) fn write_i64<W: Write>(writer: &mut W, value: i64) -> Result<(), JetroEngineError> {
3630 let mut buf = itoa::Buffer::new();
3631 writer.write_all(buf.format(value).as_bytes())?;
3632 Ok(())
3633}
3634
3635#[inline]
3636fn write_u64<W: Write>(writer: &mut W, value: u64) -> Result<(), JetroEngineError> {
3637 let mut buf = itoa::Buffer::new();
3638 writer.write_all(buf.format(value).as_bytes())?;
3639 Ok(())
3640}
3641
3642#[inline]
3643fn write_f64<W: Write>(writer: &mut W, value: f64) -> Result<(), JetroEngineError> {
3644 if value.is_finite() {
3645 let mut buf = ryu::Buffer::new();
3646 writer.write_all(buf.format(value).as_bytes())?;
3647 } else {
3648 writer.write_all(b"0")?;
3649 }
3650 Ok(())
3651}
3652
3653#[inline]
3654fn needs_json_escape(bytes: &[u8]) -> bool {
3655 bytes
3656 .iter()
3657 .any(|byte| matches!(byte, b'"' | b'\\' | 0x00..=0x1f))
3658}
3659
3660fn write_control_escape<W: Write>(writer: &mut W, byte: u8) -> Result<(), JetroEngineError> {
3661 const HEX: &[u8; 16] = b"0123456789abcdef";
3662 writer.write_all(&[
3663 b'\\',
3664 b'u',
3665 b'0',
3666 b'0',
3667 HEX[(byte >> 4) as usize],
3668 HEX[(byte & 0x0f) as usize],
3669 ])?;
3670 Ok(())
3671}
3672
3673pub(super) fn collect_row_val(
3674 engine: &JetroEngine,
3675 document: &Jetro,
3676 plan: &crate::ir::physical::QueryPlan,
3677 line_no: u64,
3678) -> Result<Val, JetroEngineError> {
3679 engine
3680 .collect_prepared_val(document, plan)
3681 .map_err(|err| row_eval_error(line_no, err))
3682}
3683
3684pub(super) fn parse_row(
3685 engine: &JetroEngine,
3686 line_no: u64,
3687 row: Vec<u8>,
3688) -> Result<Jetro, JetroEngineError> {
3689 engine
3690 .parse_bytes_lazy(row)
3691 .map_err(|err| row_parse_error(line_no, err))
3692}
3693
3694pub(super) fn row_parse_error(line_no: u64, err: JetroEngineError) -> JetroEngineError {
3695 match err {
3696 JetroEngineError::Json(source) => RowError::InvalidJson { line_no, source }.into(),
3697 JetroEngineError::Eval(eval) => RowError::InvalidJsonMessage {
3698 line_no,
3699 message: eval.to_string(),
3700 }
3701 .into(),
3702 other => other,
3703 }
3704}
3705
3706pub(super) fn row_eval_error(line_no: u64, err: crate::EvalError) -> JetroEngineError {
3707 let message = err.0;
3708 if message.starts_with("Invalid JSON:") {
3709 RowError::InvalidJsonMessage { line_no, message }.into()
3710 } else {
3711 crate::EvalError(message).into()
3712 }
3713}
3714
3715fn trim_line_ending(buf: &mut Vec<u8>) {
3716 while matches!(buf.last(), Some(b'\n' | b'\r')) {
3717 buf.pop();
3718 }
3719}
3720
3721fn strip_initial_bom(line_no: u64, buf: &mut Vec<u8>) {
3722 if line_no == 1 && buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
3723 buf.drain(..3);
3724 }
3725}
3726
3727fn non_ws_range(buf: &[u8]) -> (usize, usize) {
3728 let start = buf
3729 .iter()
3730 .position(|b| !b.is_ascii_whitespace())
3731 .unwrap_or(buf.len());
3732 let end = buf
3733 .iter()
3734 .rposition(|b| !b.is_ascii_whitespace())
3735 .map(|idx| idx + 1)
3736 .unwrap_or(start);
3737 (start, end)
3738}
3739
3740#[cfg(test)]
3741mod tests {
3742 #[test]
3743 #[cfg(feature = "simd-json")]
3744 fn rows_stream_driver_reports_direct_stage_stats() {
3745 let engine = crate::JetroEngine::new();
3746 let plan = super::ndjson_rows_stream_plan(
3747 "$.rows().filter($.active == true).distinct_by($.id).take(2).map($.id)",
3748 )
3749 .unwrap()
3750 .unwrap();
3751 let input = std::io::Cursor::new(
3752 br#"{"id":"a","active":false}
3753{"id":"a","active":true}
3754{"id":"b","active":true}
3755not-json
3756"#
3757 .to_vec(),
3758 );
3759 let mut out = Vec::new();
3760
3761 let (emitted, stats) = super::drive_ndjson_rows_stream_reader_with_stats(
3762 &engine,
3763 input,
3764 &plan,
3765 None,
3766 super::NdjsonOptions::default(),
3767 &mut out,
3768 )
3769 .unwrap();
3770
3771 assert_eq!(emitted, 2);
3772 assert_eq!(String::from_utf8(out).unwrap(), "\"a\"\n\"b\"\n");
3773 assert_eq!(stats.source, super::RowStreamSourceKind::NdjsonRows);
3774 assert_eq!(stats.direction, super::RowStreamDirection::Forward);
3775 assert_eq!(stats.rows_scanned, 3);
3776 assert_eq!(stats.rows_filtered, 1);
3777 assert_eq!(stats.rows_emitted, 2);
3778 assert_eq!(stats.direct_filter_rows, 3);
3779 assert_eq!(stats.direct_key_rows, 2);
3780 assert_eq!(stats.direct_project_rows, 2);
3781 }
3782
3783 #[test]
3784 #[cfg(feature = "simd-json")]
3785 fn parse_row_keeps_simd_document_lazy() {
3786 let engine = crate::JetroEngine::new();
3787 let row = br#"{"name":"Ada","age":30}"#.to_vec();
3788
3789 let document = super::parse_row(&engine, 1, row).expect("row parses lazily");
3790
3791 assert!(!document.root_val_is_materialized());
3792 assert!(!document.tape_is_built());
3793 }
3794
3795 #[test]
3796 fn owned_row_read_preserves_reusable_buffer_capacity() {
3797 let input = std::io::Cursor::new(b"{\"n\":1}\n{\"n\":2}\n");
3798 let mut driver = super::NdjsonPerRowDriver::new(input);
3799 let mut buf = Vec::with_capacity(128);
3800
3801 let first = driver
3802 .read_next_owned(&mut buf)
3803 .expect("row read succeeds")
3804 .expect("first row exists");
3805 assert_eq!(first.1, br#"{"n":1}"#);
3806 assert_eq!(buf.capacity(), 128);
3807
3808 let second = driver
3809 .read_next_owned(&mut buf)
3810 .expect("row read succeeds")
3811 .expect("second row exists");
3812 assert_eq!(second.1, br#"{"n":2}"#);
3813 assert_eq!(buf.capacity(), 128);
3814 }
3815
3816 #[test]
3817 #[cfg(feature = "simd-json")]
3818 fn direct_tape_plan_accepts_first_suffix() {
3819 let engine = crate::JetroEngine::new();
3820 for query in [
3821 "attributes.first().value",
3822 "attributes.last().value",
3823 "attributes.nth(1).value",
3824 ] {
3825 let plan =
3826 super::direct_tape_plan(&engine, query).expect("array suffix should be direct");
3827 assert!(matches!(
3828 plan,
3829 super::NdjsonDirectTapePlan::ArrayElementPath { .. }
3830 ));
3831 }
3832 }
3833
3834 #[test]
3835 #[cfg(feature = "simd-json")]
3836 fn direct_tape_plan_accepts_rooted_bench_shapes() {
3837 let engine = crate::JetroEngine::new();
3838 for query in [
3839 "$.id",
3840 "$.a.b.c",
3841 "$.meta.id",
3842 "$.name",
3843 "$.attributes.len()",
3844 "$.store.attributes.len()",
3845 "$.attributes.map(@.key)",
3846 "$.attributes.first().value",
3847 "$.store.attributes.first().value",
3848 "$.attributes.last().value",
3849 "$.name.upper()",
3850 "$.store.name.upper()",
3851 "$.attributes.map([@.key, @.value])",
3852 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3853 "$.keys()",
3854 ] {
3855 super::direct_tape_plan(&engine, query)
3856 .unwrap_or_else(|| panic!("{query} should have a direct NDJSON tape plan"));
3857 }
3858 }
3859
3860 #[test]
3861 #[cfg(feature = "simd-json")]
3862 fn direct_writer_plan_kind_exposes_hot_path_selection() {
3863 let engine = crate::JetroEngine::new();
3864 use super::NdjsonDirectPlanKind::{
3865 ByteExpr, TapeArrayProjection, TapeObjectProjection, TapeRootPath, TapeStreamCollect,
3866 TapeStreamCount, TapeStreamExtreme, TapeStreamFirst, TapeStreamLast, TapeStreamNumeric,
3867 };
3868
3869 for (query, expected) in [
3870 ("$.name", (Some(ByteExpr), TapeRootPath)),
3871 ("$.a.b.c", (Some(ByteExpr), TapeRootPath)),
3872 (r#"{test: $.a.b.c, b: $.a.b}"#, (None, TapeObjectProjection)),
3873 (r#"[$.id, $.name]"#, (None, TapeArrayProjection)),
3874 ("$.attributes.map(@.key)", (None, TapeStreamCollect)),
3875 (
3876 "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
3877 (None, TapeStreamCollect),
3878 ),
3879 (
3880 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
3881 (None, TapeStreamFirst),
3882 ),
3883 ("$.attributes.map(@.value).last()", (None, TapeStreamLast)),
3884 (
3885 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3886 (None, TapeStreamCount),
3887 ),
3888 (
3889 "$.attributes.map(@.weight).sum()",
3890 (None, TapeStreamNumeric),
3891 ),
3892 (
3893 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3894 (None, TapeStreamNumeric),
3895 ),
3896 (
3897 "$.attributes.sort_by(@.value).last().key",
3898 (None, TapeStreamExtreme),
3899 ),
3900 ] {
3901 let actual = super::direct_writer_plan_kind(&engine, query)
3902 .unwrap_or_else(|| panic!("{query} should have an observable direct plan"));
3903 assert_eq!(actual, expected, "{query}");
3904 }
3905 }
3906
3907 #[test]
3908 #[cfg(feature = "simd-json")]
3909 fn direct_writer_path_kind_matches_runtime_writer_family() {
3910 let engine = crate::JetroEngine::new();
3911 use super::NdjsonWriterPathKind::{ByteExpr, ByteWritableTape};
3912
3913 for (query, expected) in [
3914 ("$.name", ByteExpr),
3915 ("$.a.b.c", ByteExpr),
3916 (r#"{test: $.a.b.c, b: $.a.b}"#, ByteWritableTape),
3917 (r#"[$.id, $.name]"#, ByteWritableTape),
3918 ("$.attributes.map(@.key)", ByteWritableTape),
3919 (
3920 "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
3921 ByteWritableTape,
3922 ),
3923 (
3924 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3925 ByteWritableTape,
3926 ),
3927 ("$.attributes.map(@.weight).sum()", ByteWritableTape),
3928 (
3929 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3930 ByteWritableTape,
3931 ),
3932 (
3933 r#"{id: $.id, name: $.name, count: $.attributes.len()}"#,
3934 ByteWritableTape,
3935 ),
3936 (
3937 r#"[$.id, $.name, $.attributes.first().value, $.attributes.last().value]"#,
3938 ByteWritableTape,
3939 ),
3940 (
3941 r#"{name_upper: $.name.upper(), values: $.attributes.map(@.value), last: $.attributes.last().value}"#,
3942 ByteWritableTape,
3943 ),
3944 ("$.attributes.sort_by(@.value).last().key", ByteWritableTape),
3945 ] {
3946 assert_eq!(
3947 super::direct_writer_path_kind(&engine, query),
3948 Some(expected),
3949 "{query}"
3950 );
3951 }
3952 }
3953
3954 #[test]
3955 #[cfg(feature = "simd-json")]
3956 fn direct_tape_plan_lowers_stream_shapes_generically() {
3957 let engine = crate::JetroEngine::new();
3958 for query in [
3959 "$.attributes.map(@.key)",
3960 "$.attributes.map(@.key.upper())",
3961 "$.attributes.map(@.value).first()",
3962 "$.attributes.map(@.value).last()",
3963 r#"$.attributes.filter(@.value.contains("_3")).map(@.key)"#,
3964 r#"$.attributes.filter(@.value.contains("_3")).map(@.key.upper())"#,
3965 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
3966 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
3967 "$.attributes.map(@.weight).sum()",
3968 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
3969 "$.attributes.sort_by(@.value).last().key",
3970 ] {
3971 let plan =
3972 super::direct_tape_plan(&engine, query).expect("query should be direct NDJSON");
3973 assert!(
3974 matches!(plan, super::NdjsonDirectTapePlan::Stream(_)),
3975 "{query} should lower to a generic NDJSON stream plan"
3976 );
3977 }
3978 }
3979
3980 #[test]
3981 #[cfg(feature = "simd-json")]
3982 fn direct_byte_plan_accepts_fast_root_shapes() {
3983 let engine = crate::JetroEngine::new();
3984 for query in [
3985 "$.id",
3986 "$.name",
3987 "$.name.upper()",
3988 "$.name.lower()",
3989 "$.keys()",
3990 "$.meta.keys()",
3991 "$.values()",
3992 "$.entries()",
3993 "$.attributes.first().value",
3994 "$.store.attributes.first().value",
3995 "$.attributes.first().key.upper()",
3996 "$.attributes.last().value",
3997 "$.attributes.nth(1).value",
3998 ] {
3999 super::direct_byte_plan(&engine, query)
4000 .unwrap_or_else(|| panic!("{query} should have a direct NDJSON byte plan"));
4001 }
4002 }
4003
4004 #[test]
4005 #[cfg(feature = "simd-json")]
4006 fn direct_byte_predicates_cover_match_shapes() {
4007 let engine = crate::JetroEngine::new();
4008 let row = br#"{"active":true,"score":9910,"attributes":[{"key":"k1","value":"v_1"}]}"#;
4009 for predicate in [
4010 ("active", true),
4011 ("score > 9900", true),
4012 ("score < 100", false),
4013 (r#"attributes.first().value.contains("_1")"#, true),
4014 ] {
4015 let plan = super::direct_tape_predicate(&engine, predicate.0)
4016 .unwrap_or_else(|| panic!("{} should have a direct predicate", predicate.0));
4017 let matched = super::eval_ndjson_byte_predicate_row(row, &plan)
4018 .expect("byte predicate should evaluate")
4019 .unwrap_or_else(|| panic!("{} should not need tape fallback", predicate.0));
4020 assert_eq!(matched, predicate.1, "{}", predicate.0);
4021 }
4022 }
4023
4024 #[test]
4025 #[cfg(feature = "simd-json")]
4026 fn direct_byte_tape_plan_counts_filtered_rows() {
4027 let engine = crate::JetroEngine::new();
4028 let query = r#"attributes.filter(@.value.contains("_3")).len()"#;
4029 let plan = super::direct_tape_plan(&engine, query).expect("filter count should be direct");
4030 assert!(super::tape_plan_can_write_byte_row(&plan));
4031
4032 let row = br#"{"attributes":[{"value":"a_3"},{"value":"b"},{"value":"c_3"}]}"#;
4033 let mut out = Vec::new();
4034 let mut scratch = Vec::new();
4035 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4036 .expect("byte count should write");
4037 assert!(matches!(wrote, super::BytePlanWrite::Done));
4038 assert_eq!(out, b"2");
4039 }
4040
4041 #[test]
4042 #[cfg(feature = "simd-json")]
4043 fn direct_byte_tape_plan_reduces_numeric_streams() {
4044 let engine = crate::JetroEngine::new();
4045 let row = br#"{"attributes":[{"weight":1},{"weight":2.5},{"weight":3},{"weight":"skip"}]}"#;
4046 for (query, expected) in [
4047 ("$.attributes.map(@.weight).sum()", "6.5"),
4048 ("$.attributes.map(@.weight).avg()", "2.1666666666666665"),
4049 ("$.attributes.map(@.weight).min()", "1.0"),
4050 ("$.attributes.map(@.weight).max()", "3.0"),
4051 ] {
4052 let plan = super::direct_tape_plan(&engine, query)
4053 .unwrap_or_else(|| panic!("{query} should be direct"));
4054 assert!(
4055 super::tape_plan_can_write_byte_row(&plan),
4056 "{query} should be byte-writable"
4057 );
4058 let mut out = Vec::new();
4059 let mut scratch = Vec::new();
4060 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4061 .expect("byte numeric stream should write");
4062 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4063 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4064 }
4065 }
4066
4067 #[test]
4068 #[cfg(feature = "simd-json")]
4069 fn direct_byte_tape_plan_collects_stream_maps() {
4070 let engine = crate::JetroEngine::new();
4071 let row = br#"{"attributes":[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]}"#;
4072 for (query, expected) in [
4073 ("attributes.map(@.key)", r#"["k1","k2"]"#),
4074 (
4075 "attributes.map([@.key, @.value])",
4076 r#"[["k1","v1"],["k2","v2"]]"#,
4077 ),
4078 (
4079 "attributes.map({key: @.key, value: @.value})",
4080 r#"[{"key":"k1","value":"v1"},{"key":"k2","value":"v2"}]"#,
4081 ),
4082 ("attributes.map(@.key.upper())", r#"["K1","K2"]"#),
4083 (
4084 r#"attributes.filter(@.value.contains("2")).map(@.key)"#,
4085 r#"["k2"]"#,
4086 ),
4087 (
4088 r#"attributes.filter(@.value.contains("2")).map({key: @.key, value: @.value})"#,
4089 r#"[{"key":"k2","value":"v2"}]"#,
4090 ),
4091 (
4092 r#"attributes.filter(@.key != "k1").map([@.key, @.value])"#,
4093 r#"[["k2","v2"]]"#,
4094 ),
4095 ] {
4096 let plan = super::direct_tape_plan(&engine, query)
4097 .unwrap_or_else(|| panic!("{query} should be direct"));
4098 assert!(
4099 super::tape_plan_can_write_byte_row(&plan),
4100 "{query} should be byte-writable"
4101 );
4102 let mut out = Vec::new();
4103 let mut scratch = Vec::new();
4104 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4105 .expect("byte stream should write");
4106 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4107 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4108 }
4109 }
4110
4111 #[test]
4112 #[cfg(feature = "simd-json")]
4113 fn direct_byte_tape_plan_writes_static_projections() {
4114 let engine = crate::JetroEngine::new();
4115 let row = br#"{"id":7,"a":{"b":{"c":1}}}"#;
4116 for (query, expected) in [
4117 ("$.a.b.c", "1"),
4118 (r#"{test: $.a.b.c, b: $.a.b}"#, r#"{"test":1,"b":{"c":1}}"#),
4119 (r#"[$.a.b.c, $.id]"#, r#"[1,7]"#),
4120 ] {
4121 let plan = super::direct_tape_plan(&engine, query)
4122 .unwrap_or_else(|| panic!("{query} should be direct"));
4123 assert!(
4124 super::tape_plan_can_write_byte_row(&plan),
4125 "{query} should be byte-writable"
4126 );
4127 let mut out = Vec::new();
4128 let mut scratch = Vec::new();
4129 let wrote = super::write_ndjson_byte_tape_plan_row(&mut out, row, &plan, &mut scratch)
4130 .expect("byte projection should write");
4131 assert!(matches!(wrote, super::BytePlanWrite::Done), "{query}");
4132 assert_eq!(std::str::from_utf8(&out).unwrap(), expected, "{query}");
4133 }
4134 }
4135
4136 #[test]
4137 #[cfg(feature = "simd-json")]
4138 fn run_ndjson_uses_byte_paths_for_nested_object_items() {
4139 let engine = crate::JetroEngine::new();
4140 let rows = std::io::Cursor::new(
4141 br#"{"id":1}
4142{"id":2}
4143"#,
4144 );
4145 let mut out = Vec::new();
4146 engine
4147 .run_ndjson(rows, "$.id", &mut out)
4148 .expect("rooted byte path should run");
4149 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4150
4151 let rows = std::io::Cursor::new(
4152 br#"{"meta":{"id":1,"kind":"a"}}
4153{"meta":{"id":2,"kind":"b"}}
4154"#,
4155 );
4156
4157 let mut out = Vec::new();
4158 engine
4159 .run_ndjson(rows, "$.meta.id", &mut out)
4160 .expect("nested byte path should run");
4161 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n");
4162
4163 let rows = std::io::Cursor::new(br#"{"meta":{"id":1,"kind":"a"}}"#);
4164 let mut out = Vec::new();
4165 engine
4166 .run_ndjson(rows, "$.meta.keys()", &mut out)
4167 .expect("nested byte object items should run");
4168 assert_eq!(std::str::from_utf8(&out).unwrap(), "[\"id\",\"kind\"]\n");
4169 }
4170
4171 #[test]
4172 #[cfg(feature = "simd-json")]
4173 fn run_ndjson_uses_byte_paths_for_nested_array_demands() {
4174 let engine = crate::JetroEngine::new();
4175 let rows = std::io::Cursor::new(
4176 br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4177{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4178"#,
4179 );
4180
4181 let mut out = Vec::new();
4182 engine
4183 .run_ndjson(rows, "$.store.attributes.first().value", &mut out)
4184 .expect("nested byte array demand should run");
4185 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"a\"\n\"c\"\n");
4186
4187 out.clear();
4188 let rows = std::io::Cursor::new(
4189 br#"{"store":{"attributes":[{"value":"a"},{"value":"b"}],"after":1}}
4190{"store":{"attributes":[{"value":"c"},{"value":"d"}],"after":2}}
4191"#,
4192 );
4193 engine
4194 .run_ndjson(rows, "$.store.attributes.last().value", &mut out)
4195 .expect("nested byte last demand should run from field prefix");
4196 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"d\"\n");
4197 }
4198
4199 #[test]
4200 #[cfg(feature = "simd-json")]
4201 fn run_ndjson_static_projection_survives_hint_activation() {
4202 let engine = crate::JetroEngine::new();
4203 let rows = std::io::Cursor::new(
4204 br#"{"id":1,"name":"a","active":true}
4205{"id":2,"name":"b","active":true}
4206{"id":3,"name":"c","active":true}
4207{"id":4,"name":"d","active":true}
4208{"id":5,"name":"e","active":true}
4209{"id":6,"name":"f","active":true}
4210{"id":7,"name":"g","active":true}
4211{"id":8,"name":"h","active":true}
4212{"id":9,"name":"i","active":true}
4213"#,
4214 );
4215 let mut out = Vec::new();
4216 engine
4217 .run_ndjson(rows, r#"{id: $.id, name: $.name}"#, &mut out)
4218 .expect("hinted static projection should run");
4219 assert_eq!(
4220 std::str::from_utf8(&out).unwrap(),
4221 "{\"id\":1,\"name\":\"a\"}\n\
4222{\"id\":2,\"name\":\"b\"}\n\
4223{\"id\":3,\"name\":\"c\"}\n\
4224{\"id\":4,\"name\":\"d\"}\n\
4225{\"id\":5,\"name\":\"e\"}\n\
4226{\"id\":6,\"name\":\"f\"}\n\
4227{\"id\":7,\"name\":\"g\"}\n\
4228{\"id\":8,\"name\":\"h\"}\n\
4229{\"id\":9,\"name\":\"i\"}\n"
4230 );
4231 }
4232
4233 #[test]
4234 #[cfg(feature = "simd-json")]
4235 fn run_ndjson_nested_projection_survives_hint_activation() {
4236 let engine = crate::JetroEngine::new();
4237 let rows = std::io::Cursor::new(
4238 br#"{"id":1,"profile":{"name":"a","score":10},"active":true}
4239{"id":2,"profile":{"name":"b","score":20},"active":true}
4240{"id":3,"profile":{"name":"c","score":30},"active":true}
4241{"id":4,"profile":{"name":"d","score":40},"active":true}
4242{"id":5,"profile":{"name":"e","score":50},"active":true}
4243{"id":6,"profile":{"name":"f","score":60},"active":true}
4244{"id":7,"profile":{"name":"g","score":70},"active":true}
4245{"id":8,"profile":{"name":"h","score":80},"active":true}
4246{"id":9,"profile":{"name":"i","score":90},"active":true}
4247"#,
4248 );
4249 let mut out = Vec::new();
4250 engine
4251 .run_ndjson(
4252 rows,
4253 r#"{id: $.id, name: $.profile.name, profile: $.profile}"#,
4254 &mut out,
4255 )
4256 .expect("hinted nested projection should run");
4257 assert_eq!(
4258 std::str::from_utf8(&out).unwrap(),
4259 "{\"id\":1,\"name\":\"a\",\"profile\":{\"name\":\"a\",\"score\":10}}\n\
4260{\"id\":2,\"name\":\"b\",\"profile\":{\"name\":\"b\",\"score\":20}}\n\
4261{\"id\":3,\"name\":\"c\",\"profile\":{\"name\":\"c\",\"score\":30}}\n\
4262{\"id\":4,\"name\":\"d\",\"profile\":{\"name\":\"d\",\"score\":40}}\n\
4263{\"id\":5,\"name\":\"e\",\"profile\":{\"name\":\"e\",\"score\":50}}\n\
4264{\"id\":6,\"name\":\"f\",\"profile\":{\"name\":\"f\",\"score\":60}}\n\
4265{\"id\":7,\"name\":\"g\",\"profile\":{\"name\":\"g\",\"score\":70}}\n\
4266{\"id\":8,\"name\":\"h\",\"profile\":{\"name\":\"h\",\"score\":80}}\n\
4267{\"id\":9,\"name\":\"i\",\"profile\":{\"name\":\"i\",\"score\":90}}\n"
4268 );
4269 }
4270
4271 #[test]
4272 #[cfg(feature = "simd-json")]
4273 fn run_ndjson_scalar_projection_survives_hint_activation() {
4274 let engine = crate::JetroEngine::new();
4275 let rows = std::io::Cursor::new(
4276 br#"{"id":1,"profile":{"name":"a"}}
4277{"id":2,"profile":{"name":"b"}}
4278{"id":3,"profile":{"name":"c"}}
4279{"id":4,"profile":{"name":"d"}}
4280{"id":5,"profile":{"name":"e"}}
4281{"id":6,"profile":{"name":"f"}}
4282{"id":7,"profile":{"name":"g"}}
4283{"id":8,"profile":{"name":"h"}}
4284{"id":9,"profile":{"name":"i"}}
4285"#,
4286 );
4287 let mut out = Vec::new();
4288 engine
4289 .run_ndjson(
4290 rows,
4291 r#"{id: $.id, name: $.profile.name.upper()}"#,
4292 &mut out,
4293 )
4294 .expect("hinted scalar projection should run");
4295 assert_eq!(
4296 std::str::from_utf8(&out).unwrap(),
4297 "{\"id\":1,\"name\":\"A\"}\n\
4298{\"id\":2,\"name\":\"B\"}\n\
4299{\"id\":3,\"name\":\"C\"}\n\
4300{\"id\":4,\"name\":\"D\"}\n\
4301{\"id\":5,\"name\":\"E\"}\n\
4302{\"id\":6,\"name\":\"F\"}\n\
4303{\"id\":7,\"name\":\"G\"}\n\
4304{\"id\":8,\"name\":\"H\"}\n\
4305{\"id\":9,\"name\":\"I\"}\n"
4306 );
4307 }
4308
4309 #[test]
4310 #[cfg(feature = "simd-json")]
4311 fn run_ndjson_stream_collect_survives_hint_activation() {
4312 let engine = crate::JetroEngine::new();
4313 let rows = std::io::Cursor::new(
4314 br#"{"id":1,"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4315{"id":2,"attributes":[{"key":"c","value":"z"}]}
4316{"id":3,"attributes":[{"key":"d","value":"w"}]}
4317"#,
4318 );
4319 let mut out = Vec::new();
4320 engine
4321 .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4322 .expect("hinted stream collect should run");
4323 assert_eq!(
4324 std::str::from_utf8(&out).unwrap(),
4325 "[[\"a\",\"x\"],[\"b\",\"y\"]]\n[[\"c\",\"z\"]]\n[[\"d\",\"w\"]]\n"
4326 );
4327 }
4328
4329 #[test]
4330 #[cfg(feature = "simd-json")]
4331 fn run_ndjson_stream_cache_rejects_reordered_item_prefixes() {
4332 let engine = crate::JetroEngine::new();
4333 let rows = std::io::Cursor::new(
4334 br#"{"attributes":[{"key":"k1","value":"a"}]}
4335{"attributes":[{"value":"k1","key":"actual"}]}
4336"#,
4337 );
4338 let mut out = Vec::new();
4339 engine
4340 .run_ndjson(rows, "$.attributes.map(@.key)", &mut out)
4341 .expect("stream cache should fall back on reordered item fields");
4342 assert_eq!(
4343 std::str::from_utf8(&out).unwrap(),
4344 "[\"k1\"]\n[\"actual\"]\n"
4345 );
4346 }
4347
4348 #[test]
4349 #[cfg(feature = "simd-json")]
4350 fn run_ndjson_stream_map_preserves_missing_field_nulls() {
4351 let engine = crate::JetroEngine::new();
4352 let rows = std::io::Cursor::new(
4353 br#"{"attributes":[{"key":"a","value":"x"},{"key":"b"}]}
4354{"attributes":[{"value":"z"}]}
4355"#,
4356 );
4357 let mut out = Vec::new();
4358 engine
4359 .run_ndjson(rows, "$.attributes.map([@.key, @.value])", &mut out)
4360 .expect("stream map should preserve nulls for missing fields");
4361 assert_eq!(
4362 std::str::from_utf8(&out).unwrap(),
4363 "[[\"a\",\"x\"],[\"b\",null]]\n[[null,\"z\"]]\n"
4364 );
4365 }
4366
4367 #[test]
4368 #[cfg(feature = "simd-json")]
4369 fn run_ndjson_stream_object_map_preserves_scalar_calls() {
4370 let engine = crate::JetroEngine::new();
4371 let rows = std::io::Cursor::new(
4372 br#"{"attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4373"#,
4374 );
4375 let mut out = Vec::new();
4376 engine
4377 .run_ndjson(
4378 rows,
4379 "$.attributes.map({k: @.key, v: @.value.upper()})",
4380 &mut out,
4381 )
4382 .expect("stream object map should preserve scalar calls");
4383 assert_eq!(
4384 std::str::from_utf8(&out).unwrap(),
4385 "[{\"k\":\"a\",\"v\":\"X\"},{\"k\":\"b\",\"v\":\"Y\"}]\n"
4386 );
4387 }
4388
4389 #[test]
4390 #[cfg(feature = "simd-json")]
4391 fn run_ndjson_stream_map_projects_nested_item_paths() {
4392 let engine = crate::JetroEngine::new();
4393 let rows = std::io::Cursor::new(
4394 br#"{"attributes":[{"key":"a","meta":{"code":"x"}},{"key":"b","meta":{"code":"y"}}]}
4395"#,
4396 );
4397 let mut out = Vec::new();
4398 engine
4399 .run_ndjson(
4400 rows,
4401 "$.attributes.map({k: @.key, code: @.meta.code.upper()})",
4402 &mut out,
4403 )
4404 .expect("stream map should project nested item paths");
4405 assert_eq!(
4406 std::str::from_utf8(&out).unwrap(),
4407 "[{\"k\":\"a\",\"code\":\"X\"},{\"k\":\"b\",\"code\":\"Y\"}]\n"
4408 );
4409 }
4410
4411 #[test]
4412 #[cfg(feature = "simd-json")]
4413 fn run_ndjson_stream_count_survives_hint_activation() {
4414 let engine = crate::JetroEngine::new();
4415 let rows = std::io::Cursor::new(
4416 br#"{"id":1,"attributes":[{"value":"x_3"},{"value":"y"}]}
4417{"id":2,"attributes":[{"value":"z_3"},{"value":"w_3"}]}
4418{"id":3,"attributes":[{"value":"n"}]}
4419"#,
4420 );
4421 let mut out = Vec::new();
4422 engine
4423 .run_ndjson(
4424 rows,
4425 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4426 &mut out,
4427 )
4428 .expect("hinted stream count should run");
4429 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n2\n0\n");
4430 }
4431
4432 #[test]
4433 #[cfg(feature = "simd-json")]
4434 fn run_ndjson_filtered_count_ignores_missing_predicate_fields() {
4435 let engine = crate::JetroEngine::new();
4436 let rows = std::io::Cursor::new(
4437 br#"{"attributes":[{"value":"x_3"},{"key":"missing"},{"value":"y"}]}
4438{"attributes":[{"key":"missing"}]}
4439"#,
4440 );
4441 let mut out = Vec::new();
4442 engine
4443 .run_ndjson(
4444 rows,
4445 r#"$.attributes.filter(@.value.contains("_3")).len()"#,
4446 &mut out,
4447 )
4448 .expect("filtered count should ignore missing predicate fields");
4449 assert_eq!(std::str::from_utf8(&out).unwrap(), "1\n0\n");
4450 }
4451
4452 #[test]
4453 fn run_ndjson_filter_last_returns_last_matching_output() {
4454 let engine = crate::JetroEngine::new();
4455 let rows = std::io::Cursor::new(
4456 br#"{"attributes":[{"key":"keep","value":"first"},{"key":"drop","value":"physical-last"}]}
4457{"attributes":[{"key":"drop","value":"first"},{"key":"keep","value":"semantic-last"},{"key":"drop","value":"physical-last"}]}
4458"#,
4459 );
4460 let mut out = Vec::new();
4461
4462 engine
4463 .run_ndjson(
4464 rows,
4465 r#"$.attributes.filter(@.key == "keep").last().value"#,
4466 &mut out,
4467 )
4468 .expect("filtered last should preserve semantic output order");
4469
4470 assert_eq!(
4471 std::str::from_utf8(&out).unwrap(),
4472 "\"first\"\n\"semantic-last\"\n"
4473 );
4474 }
4475
4476 #[test]
4477 fn run_ndjson_filter_map_first_stops_at_first_matching_output() {
4478 let engine = crate::JetroEngine::new();
4479 let rows = std::io::Cursor::new(
4480 br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4481{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4482{"attributes":[{"key":"a","value":"skip"}]}
4483"#,
4484 );
4485 let mut out = Vec::new();
4486
4487 engine
4488 .run_ndjson(
4489 rows,
4490 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).first()"#,
4491 &mut out,
4492 )
4493 .expect("filtered first should use direct stream first");
4494
4495 assert_eq!(
4496 std::str::from_utf8(&out).unwrap(),
4497 "{\"key\":\"a\",\"value\":\"x_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4498 );
4499 }
4500
4501 #[test]
4502 fn run_ndjson_map_first_projects_first_item_without_filter() {
4503 let engine = crate::JetroEngine::new();
4504 let rows = std::io::Cursor::new(
4505 br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"later"}]}
4506{"attributes":[]}
4507{"attributes":[{"key":"c","value":"only"}]}
4508"#,
4509 );
4510 let mut out = Vec::new();
4511
4512 engine
4513 .run_ndjson(rows, "$.attributes.map(@.value).first()", &mut out)
4514 .expect("unfiltered first should use direct stream first");
4515
4516 assert_eq!(
4517 std::str::from_utf8(&out).unwrap(),
4518 "\"first\"\n\"only\"\n"
4519 );
4520 }
4521
4522 #[test]
4523 fn run_ndjson_map_last_projects_last_item_without_filter() {
4524 let engine = crate::JetroEngine::new();
4525 let rows = std::io::Cursor::new(
4526 br#"{"attributes":[{"key":"a","value":"first"},{"key":"b","value":"last"}]}
4527{"attributes":[]}
4528{"attributes":[{"key":"c","value":"only"}]}
4529"#,
4530 );
4531 let mut out = Vec::new();
4532
4533 engine
4534 .run_ndjson(rows, "$.attributes.map(@.value).last()", &mut out)
4535 .expect("unfiltered last should use direct stream last");
4536
4537 assert_eq!(
4538 std::str::from_utf8(&out).unwrap(),
4539 "\"last\"\n\"only\"\n"
4540 );
4541 }
4542
4543 #[test]
4544 fn run_ndjson_filter_map_last_keeps_latest_matching_output() {
4545 let engine = crate::JetroEngine::new();
4546 let rows = std::io::Cursor::new(
4547 br#"{"attributes":[{"key":"a","value":"x_3"},{"key":"b","value":"later_3"}]}
4548{"attributes":[{"key":"a","value":"skip"},{"key":"b","value":"y_3"}]}
4549{"attributes":[{"key":"a","value":"skip"}]}
4550"#,
4551 );
4552 let mut out = Vec::new();
4553
4554 engine
4555 .run_ndjson(
4556 rows,
4557 r#"$.attributes.filter(@.value.contains("_3")).map({key: @.key, value: @.value}).last()"#,
4558 &mut out,
4559 )
4560 .expect("filtered last should use direct stream last");
4561
4562 assert_eq!(
4563 std::str::from_utf8(&out).unwrap(),
4564 "{\"key\":\"b\",\"value\":\"later_3\"}\n{\"key\":\"b\",\"value\":\"y_3\"}\n"
4565 );
4566 }
4567
4568 #[test]
4569 #[cfg(feature = "simd-json")]
4570 fn run_ndjson_stream_numeric_survives_hint_activation() {
4571 let engine = crate::JetroEngine::new();
4572 let rows = std::io::Cursor::new(
4573 br#"{"id":1,"attributes":[{"weight":1},{"weight":2}]}
4574{"id":2,"attributes":[{"weight":3.5},{"weight":4}]}
4575{"id":3,"attributes":[{"weight":"skip"}]}
4576"#,
4577 );
4578 let mut out = Vec::new();
4579 engine
4580 .run_ndjson(rows, "$.attributes.map(@.weight).sum()", &mut out)
4581 .expect("hinted numeric stream should run");
4582 assert_eq!(std::str::from_utf8(&out).unwrap(), "3\n7.5\n0\n");
4583 }
4584
4585 #[test]
4586 #[cfg(feature = "simd-json")]
4587 fn run_ndjson_filtered_stream_numeric_uses_shared_fields() {
4588 let engine = crate::JetroEngine::new();
4589 let rows = std::io::Cursor::new(
4590 br#"{"attributes":[{"value":"x_3","weight":1},{"value":"skip","weight":10},{"value":"y_3","weight":2.5}]}
4591{"attributes":[{"value":"skip","weight":4}]}
4592"#,
4593 );
4594 let mut out = Vec::new();
4595 engine
4596 .run_ndjson(
4597 rows,
4598 r#"$.attributes.filter(@.value.contains("_3")).map(@.weight).sum()"#,
4599 &mut out,
4600 )
4601 .expect("filtered numeric stream should use byte path");
4602 assert_eq!(std::str::from_utf8(&out).unwrap(), "3.5\n0\n");
4603 }
4604
4605 #[test]
4606 #[cfg(feature = "simd-json")]
4607 fn run_ndjson_stream_extreme_projects_selected_item_field() {
4608 let engine = crate::JetroEngine::new();
4609 let rows = std::io::Cursor::new(
4610 br#"{"attributes":[{"key":"a","value":"m"},{"key":"b","value":"z"},{"key":"c","value":"n"}]}
4611{"attributes":[{"key":"x","value":"b"},{"key":"y","value":"a"}]}
4612"#,
4613 );
4614 let mut out = Vec::new();
4615 engine
4616 .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4617 .expect("stream extreme should project selected item field");
4618 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"x\"\n");
4619 }
4620
4621 #[test]
4622 fn run_ndjson_stream_extreme_handles_escaped_string_keys() {
4623 let engine = crate::JetroEngine::new();
4624 let rows = std::io::Cursor::new(
4625 br#"{"attributes":[{"key":"a","value":"v\"1"},{"key":"b","value":"v_9"}]}
4626"#,
4627 );
4628 let mut out = Vec::new();
4629 engine
4630 .run_ndjson(rows, "$.attributes.sort_by(@.value).last().key", &mut out)
4631 .expect("escaped extrema keys should fall back safely");
4632 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n");
4633 }
4634
4635 #[test]
4636 fn run_ndjson_stream_extreme_handles_numeric_keys() {
4637 let engine = crate::JetroEngine::new();
4638 let rows = std::io::Cursor::new(
4639 br#"{"attributes":[{"key":"a","score":1},{"key":"b","score":10},{"key":"c","score":2}]}
4640{"attributes":[{"key":"x","score":-2},{"key":"y","score":-1.5}]}
4641"#,
4642 );
4643 let mut out = Vec::new();
4644 engine
4645 .run_ndjson(rows, "$.attributes.sort_by(@.score).last().key", &mut out)
4646 .expect("numeric extrema keys should use direct stream extrema");
4647 assert_eq!(std::str::from_utf8(&out).unwrap(), "\"b\"\n\"y\"\n");
4648 }
4649
4650 #[test]
4651 #[cfg(feature = "simd-json")]
4652 fn run_ndjson_nested_direct_projection_writes_without_fallback() {
4653 let engine = crate::JetroEngine::new();
4654 let rows = std::io::Cursor::new(
4655 br#"{"id":1,"name":"ada","attributes":[{"key":"a","value":"x"},{"key":"b","value":"y"}]}
4656{"id":2,"name":"bob","attributes":[{"key":"c","value":"z"}]}
4657"#,
4658 );
4659 let mut out = Vec::new();
4660 engine
4661 .run_ndjson(
4662 rows,
4663 r#"{id: $.id, name: $.name, count: $.attributes.len(), first: $.attributes.first().value, values: $.attributes.map(@.value)}"#,
4664 &mut out,
4665 )
4666 .expect("nested direct projection should run");
4667 assert_eq!(
4668 std::str::from_utf8(&out).unwrap(),
4669 "{\"id\":1,\"name\":\"ada\",\"count\":2,\"first\":\"x\",\"values\":[\"x\",\"y\"]}\n\
4670{\"id\":2,\"name\":\"bob\",\"count\":1,\"first\":\"z\",\"values\":[\"z\"]}\n"
4671 );
4672 }
4673}