1use super::ndjson_byte::{
2 raw_json_byte_path_value, tape_plan_can_write_byte_row, write_ndjson_byte_tape_plan_row,
3 BytePlanWrite, RawFieldValue,
4};
5use super::ndjson_distinct::{
6 distinct_key_bytes, raw_distinct_key_bytes, AdaptiveDistinctKeys, DistinctFrontFilterKind,
7};
8use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
9use super::ndjson_route::{
10 NdjsonExecutionReport, NdjsonExecutionStats, NdjsonRouteExplain, NdjsonRouteKind,
11 NdjsonSourceCaps,
12};
13use super::RowError;
14use crate::util::is_truthy;
15use crate::{JetroEngine, JetroEngineError};
16use memchr::memrchr;
17use serde_json::Value;
18use std::borrow::Cow;
19use std::collections::VecDeque;
20use std::fs::File;
21use std::io::{Read, Seek, SeekFrom, Write};
22use std::path::Path;
23
24pub struct NdjsonReverseFileDriver {
31 file: File,
32 pos: u64,
33 chunk_size: usize,
34 max_line_len: usize,
35 row_frame: NdjsonRowFrame,
36 carry: Vec<u8>,
37 pending: VecDeque<Vec<u8>>,
38 finished_head: bool,
39 reverse_line_no: u64,
40}
41
42impl NdjsonReverseFileDriver {
43 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
44 Self::with_options(path, super::ndjson::NdjsonOptions::default())
45 }
46
47 pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
48 Self::with_options(
49 path,
50 super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
51 )
52 }
53
54 pub fn with_options<P: AsRef<Path>>(
55 path: P,
56 options: super::ndjson::NdjsonOptions,
57 ) -> Result<Self, RowError> {
58 let mut file = File::open(path)?;
59 let pos = file.seek(SeekFrom::End(0))?;
60 Ok(Self {
61 file,
62 pos,
63 chunk_size: options.reverse_chunk_size.max(1),
64 max_line_len: options.max_line_len,
65 row_frame: options.row_frame,
66 carry: Vec::new(),
67 pending: VecDeque::new(),
68 finished_head: false,
69 reverse_line_no: 0,
70 })
71 }
72
73 pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
74 Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
75 }
76
77 pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
78 loop {
79 if let Some(mut line) = self.pending.pop_front() {
80 self.reverse_line_no += 1;
81 if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
82 return Ok(Some((self.reverse_line_no, line)));
83 }
84 continue;
85 }
86
87 if self.pos == 0 {
88 if self.finished_head || self.carry.is_empty() {
89 return Ok(None);
90 }
91 self.finished_head = true;
92 let mut line = std::mem::take(&mut self.carry);
93 trim_line_ending(&mut line);
94 self.check_line_len(line.len())?;
95 if line.iter().any(|b| !b.is_ascii_whitespace()) {
96 self.reverse_line_no += 1;
97 if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
98 return Ok(Some((self.reverse_line_no, line)));
99 }
100 }
101 return Ok(None);
102 }
103
104 let read_len = self.chunk_size.min(self.pos as usize);
105 self.pos -= read_len as u64;
106 let mut chunk = vec![0u8; read_len];
107 self.file.seek(SeekFrom::Start(self.pos))?;
108 self.file.read_exact(&mut chunk)?;
109
110 let mut end = chunk.len();
111 while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
112 let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
113 line.extend_from_slice(&chunk[nl + 1..end]);
114 line.extend_from_slice(&self.carry);
115 self.carry.clear();
116 end = nl;
117 trim_line_ending(&mut line);
118 self.check_line_len(line.len())?;
119 if line.iter().any(|b| !b.is_ascii_whitespace()) {
120 self.pending.push_back(line);
121 }
122 }
123
124 if end > 0 {
125 let mut next = Vec::with_capacity(end + self.carry.len());
126 next.extend_from_slice(&chunk[..end]);
127 next.extend_from_slice(&self.carry);
128 self.check_line_len(next.len())?;
129 self.carry = next;
130 }
131 }
132 }
133
134 fn frame_line(&self, line_no: u64, line: &mut Vec<u8>) -> Result<Option<Vec<u8>>, RowError> {
135 match frame_payload(self.row_frame, line_no, line)? {
136 FramePayload::Data(range) => {
137 if range.start > 0 || range.end < line.len() {
138 line.copy_within(range.clone(), 0);
139 line.truncate(range.end - range.start);
140 }
141 Ok(Some(std::mem::take(line)))
142 }
143 FramePayload::Skip => Ok(None),
144 }
145 }
146
147 fn check_line_len(&self, len: usize) -> Result<(), RowError> {
148 if len > self.max_line_len {
149 return Err(RowError::LineTooLarge {
150 line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
151 len,
152 max: self.max_line_len,
153 });
154 }
155 Ok(())
156 }
157}
158
159pub fn collect_ndjson_rev<P>(
160 engine: &JetroEngine,
161 path: P,
162 query: &str,
163) -> Result<Vec<Value>, JetroEngineError>
164where
165 P: AsRef<Path>,
166{
167 collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
168}
169
170pub fn collect_ndjson_rev_with_options<P>(
171 engine: &JetroEngine,
172 path: P,
173 query: &str,
174 options: super::ndjson::NdjsonOptions,
175) -> Result<Vec<Value>, JetroEngineError>
176where
177 P: AsRef<Path>,
178{
179 let mut values = Vec::new();
180 drive_rev(engine, path, query, options, |value| {
181 values.push(Value::from(value));
182 Ok(super::ndjson::NdjsonControl::Continue)
183 })?;
184 Ok(values)
185}
186
187pub fn for_each_ndjson_rev<P, F>(
188 engine: &JetroEngine,
189 path: P,
190 query: &str,
191 mut f: F,
192) -> Result<usize, JetroEngineError>
193where
194 P: AsRef<Path>,
195 F: FnMut(Value),
196{
197 for_each_ndjson_rev_with_options(
198 engine,
199 path,
200 query,
201 super::ndjson::NdjsonOptions::default(),
202 |value| {
203 f(value);
204 Ok(super::ndjson::NdjsonControl::Continue)
205 },
206 )
207}
208
209pub fn for_each_ndjson_rev_with_options<P, F>(
210 engine: &JetroEngine,
211 path: P,
212 query: &str,
213 options: super::ndjson::NdjsonOptions,
214 mut f: F,
215) -> Result<usize, JetroEngineError>
216where
217 P: AsRef<Path>,
218 F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
219{
220 drive_rev(engine, path, query, options, |value| f(Value::from(value)))
221}
222
223pub fn collect_ndjson_rev_matches<P>(
224 engine: &JetroEngine,
225 path: P,
226 predicate: &str,
227 limit: usize,
228) -> Result<Vec<Value>, JetroEngineError>
229where
230 P: AsRef<Path>,
231{
232 collect_ndjson_rev_matches_with_options(
233 engine,
234 path,
235 predicate,
236 limit,
237 super::ndjson::NdjsonOptions::default(),
238 )
239}
240
241pub fn collect_ndjson_rev_matches_with_options<P>(
242 engine: &JetroEngine,
243 path: P,
244 predicate: &str,
245 limit: usize,
246 options: super::ndjson::NdjsonOptions,
247) -> Result<Vec<Value>, JetroEngineError>
248where
249 P: AsRef<Path>,
250{
251 let mut values = Vec::with_capacity(limit);
252 drive_rev_matches(engine, path, predicate, limit, options, |value| {
253 values.push(Value::from(value));
254 Ok(super::ndjson::NdjsonControl::Continue)
255 })?;
256 Ok(values)
257}
258
259pub fn run_ndjson_rev<P, W>(
260 engine: &JetroEngine,
261 path: P,
262 query: &str,
263 writer: W,
264) -> Result<usize, JetroEngineError>
265where
266 P: AsRef<Path>,
267 W: Write,
268{
269 run_ndjson_rev_with_options(
270 engine,
271 path,
272 query,
273 writer,
274 super::ndjson::NdjsonOptions::default(),
275 )
276}
277
278pub fn run_ndjson_rev_with_options<P, W>(
279 engine: &JetroEngine,
280 path: P,
281 query: &str,
282 writer: W,
283 options: super::ndjson::NdjsonOptions,
284) -> Result<usize, JetroEngineError>
285where
286 P: AsRef<Path>,
287 W: Write,
288{
289 if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
290 return drive_rev_writer_tape(engine, path, &plan, None, options, writer);
291 }
292
293 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
294 let mut emitted = 0usize;
295 drive_rev(engine, path, query, options, |value| {
296 if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
297 emitted += 1;
298 }
299 Ok(super::ndjson::NdjsonControl::Continue)
300 })?;
301 writer.flush()?;
302 Ok(emitted)
303}
304
305pub fn run_ndjson_rev_limit<P, W>(
306 engine: &JetroEngine,
307 path: P,
308 query: &str,
309 limit: usize,
310 writer: W,
311) -> Result<usize, JetroEngineError>
312where
313 P: AsRef<Path>,
314 W: Write,
315{
316 run_ndjson_rev_limit_with_options(
317 engine,
318 path,
319 query,
320 limit,
321 writer,
322 super::ndjson::NdjsonOptions::default(),
323 )
324}
325
326pub fn run_ndjson_rev_limit_with_options<P, W>(
327 engine: &JetroEngine,
328 path: P,
329 query: &str,
330 limit: usize,
331 writer: W,
332 options: super::ndjson::NdjsonOptions,
333) -> Result<usize, JetroEngineError>
334where
335 P: AsRef<Path>,
336 W: Write,
337{
338 if limit == 0 {
339 return Ok(0);
340 }
341 if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
342 return drive_rev_writer_tape(engine, path, &plan, Some(limit), options, writer);
343 }
344
345 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
346 let mut emitted = 0usize;
347 drive_rev(engine, path, query, options, |value| {
348 let wrote = super::ndjson::write_val_line_with_options(&mut writer, &value, options)?;
349 if wrote {
350 emitted += 1;
351 }
352 Ok(if wrote && emitted >= limit {
353 super::ndjson::NdjsonControl::Stop
354 } else {
355 super::ndjson::NdjsonControl::Continue
356 })
357 })?;
358 writer.flush()?;
359 Ok(emitted)
360}
361
362pub fn run_ndjson_rev_distinct_by<P, W>(
363 engine: &JetroEngine,
364 path: P,
365 key_query: &str,
366 query: &str,
367 limit: usize,
368 writer: W,
369) -> Result<usize, JetroEngineError>
370where
371 P: AsRef<Path>,
372 W: Write,
373{
374 run_ndjson_rev_distinct_by_with_options(
375 engine,
376 path,
377 key_query,
378 query,
379 limit,
380 writer,
381 super::ndjson::NdjsonOptions::default(),
382 )
383}
384
385pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
386 engine: &JetroEngine,
387 path: P,
388 key_query: &str,
389 query: &str,
390 limit: usize,
391 writer: W,
392 options: super::ndjson::NdjsonOptions,
393) -> Result<usize, JetroEngineError>
394where
395 P: AsRef<Path>,
396 W: Write,
397{
398 run_ndjson_rev_distinct_by_with_stats_and_options(
399 engine, path, key_query, query, limit, writer, options,
400 )
401 .map(|stats| stats.emitted)
402}
403
404pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
405 engine: &JetroEngine,
406 path: P,
407 key_query: &str,
408 query: &str,
409 limit: usize,
410 writer: W,
411) -> Result<NdjsonRevDistinctStats, JetroEngineError>
412where
413 P: AsRef<Path>,
414 W: Write,
415{
416 run_ndjson_rev_distinct_by_with_stats_and_options(
417 engine,
418 path,
419 key_query,
420 query,
421 limit,
422 writer,
423 super::ndjson::NdjsonOptions::default(),
424 )
425}
426
427pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
428 engine: &JetroEngine,
429 path: P,
430 key_query: &str,
431 query: &str,
432 limit: usize,
433 writer: W,
434 options: super::ndjson::NdjsonOptions,
435) -> Result<NdjsonRevDistinctStats, JetroEngineError>
436where
437 P: AsRef<Path>,
438 W: Write,
439{
440 if limit == 0 {
441 return Ok(NdjsonRevDistinctStats::default());
442 }
443 let direct_key_plan = super::ndjson::direct_tape_plan(engine, key_query);
444 let direct_value_plan = super::ndjson::direct_tape_plan(engine, query)
445 .filter(|plan| tape_plan_can_write_byte_row(plan));
446
447 let mut key_plan = None;
448 let mut value_plan = None;
449 let mut vm = None;
450 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
451 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
452 let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
453 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
454 let mut seen = AdaptiveDistinctKeys::default();
455 let mut stats = NdjsonRevDistinctStats::default();
456
457 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
458 stats.rows_scanned += 1;
459 let mut row = Some(row);
460 let mut document = None;
461 let direct_key = direct_key_plan.as_ref().and_then(|plan| {
462 row.as_deref()
463 .and_then(|row| distinct_key_direct(row, plan))
464 });
465
466 let inserted = if let Some(key) = direct_key {
467 stats.direct_key_rows += 1;
468 match key {
469 Cow::Borrowed(key) => seen.insert_slice(key),
470 Cow::Owned(key) => seen.insert(key),
471 }
472 } else {
473 stats.fallback_key_rows += 1;
474 let parsed = super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?;
475 let plan = key_plan.get_or_insert_with(|| {
476 engine.cached_plan(key_query, crate::plan::physical::PlanningContext::bytes())
477 });
478 let vm = vm.get_or_insert_with(|| engine.lock_vm());
479 let key = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
480 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
481 let key = distinct_key_bytes(&key)?;
482 document = Some(parsed);
483 seen.insert(key)
484 };
485 if !inserted {
486 stats.duplicate_rows += 1;
487 continue;
488 }
489 if let (Some(plan), Some(row)) = (direct_value_plan.as_ref(), row.as_deref()) {
490 byte_scratch.clear();
491 out.clear();
492 match write_ndjson_byte_tape_plan_row(&mut out, row, plan, &mut byte_scratch)? {
493 BytePlanWrite::Done => {
494 if super::ndjson::write_json_bytes_line_with_options(
495 &mut writer,
496 &out,
497 options,
498 )? {
499 stats.direct_value_rows += 1;
500 stats.emitted += 1;
501 }
502 if stats.emitted >= limit {
503 break;
504 }
505 continue;
506 }
507 BytePlanWrite::Fallback => {}
508 }
509 }
510
511 let parsed = match document {
512 Some(document) => document,
513 None => super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?,
514 };
515 let plan = value_plan.get_or_insert_with(|| {
516 engine.cached_plan(query, crate::plan::physical::PlanningContext::bytes())
517 });
518 let vm = vm.get_or_insert_with(|| engine.lock_vm());
519 let value = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
520 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
521 if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
522 stats.fallback_value_rows += 1;
523 stats.emitted += 1;
524 }
525 if stats.emitted >= limit {
526 break;
527 }
528 }
529
530 writer.flush()?;
531 stats.front_filter = seen.front_kind();
532 Ok(stats)
533}
534
535pub fn run_ndjson_rev_distinct_by_with_report<P, W>(
536 engine: &JetroEngine,
537 path: P,
538 key_query: &str,
539 query: &str,
540 limit: usize,
541 writer: W,
542) -> Result<NdjsonExecutionReport, JetroEngineError>
543where
544 P: AsRef<Path>,
545 W: Write,
546{
547 run_ndjson_rev_distinct_by_with_report_and_options(
548 engine,
549 path,
550 key_query,
551 query,
552 limit,
553 writer,
554 super::ndjson::NdjsonOptions::default(),
555 )
556}
557
558pub fn run_ndjson_rev_distinct_by_with_report_and_options<P, W>(
559 engine: &JetroEngine,
560 path: P,
561 key_query: &str,
562 query: &str,
563 limit: usize,
564 writer: W,
565 options: super::ndjson::NdjsonOptions,
566) -> Result<NdjsonExecutionReport, JetroEngineError>
567where
568 P: AsRef<Path>,
569 W: Write,
570{
571 let stats = run_ndjson_rev_distinct_by_with_stats_and_options(
572 engine, path, key_query, query, limit, writer, options,
573 )?;
574 Ok(NdjsonExecutionReport::new(
575 NdjsonRouteExplain {
576 kind: NdjsonRouteKind::RowLocal,
577 source: NdjsonSourceCaps::file(options),
578 writer_path: super::ndjson::ndjson_writer_path_kind(engine, query),
579 rows_plan: None,
580 fallback_reason: None,
581 },
582 NdjsonExecutionStats {
583 rows_scanned: stats.rows_scanned,
584 rows_emitted: stats.emitted,
585 duplicate_rows: stats.duplicate_rows,
586 direct_key_rows: stats.direct_key_rows,
587 fallback_key_rows: stats.fallback_key_rows,
588 direct_project_rows: stats.direct_value_rows,
589 fallback_project_rows: stats.fallback_value_rows,
590 ..NdjsonExecutionStats::default()
591 },
592 ))
593}
594
595#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
596pub struct NdjsonRevDistinctStats {
597 pub rows_scanned: usize,
598 pub emitted: usize,
599 pub duplicate_rows: usize,
600 pub direct_key_rows: usize,
601 pub fallback_key_rows: usize,
602 pub direct_value_rows: usize,
603 pub fallback_value_rows: usize,
604 pub front_filter: DistinctFrontFilterKind,
605}
606fn distinct_key_direct<'a>(
607 row: &'a [u8],
608 plan: &super::ndjson::NdjsonDirectTapePlan,
609) -> Option<Cow<'a, [u8]>> {
610 const NULL_KEY: &[u8] = b"null";
611
612 let super::ndjson::NdjsonDirectTapePlan::RootPath(steps) = plan else {
613 return None;
614 };
615 match raw_json_byte_path_value(row, steps) {
616 RawFieldValue::Found(value) => raw_distinct_key_bytes(value),
617 RawFieldValue::Missing => Some(Cow::Borrowed(NULL_KEY)),
618 RawFieldValue::Fallback => None,
619 }
620}
621
622pub fn run_ndjson_rev_matches<P, W>(
623 engine: &JetroEngine,
624 path: P,
625 predicate: &str,
626 limit: usize,
627 writer: W,
628) -> Result<usize, JetroEngineError>
629where
630 P: AsRef<Path>,
631 W: Write,
632{
633 run_ndjson_rev_matches_with_options(
634 engine,
635 path,
636 predicate,
637 limit,
638 writer,
639 super::ndjson::NdjsonOptions::default(),
640 )
641}
642
643pub fn run_ndjson_rev_matches_with_options<P, W>(
644 engine: &JetroEngine,
645 path: P,
646 predicate: &str,
647 limit: usize,
648 writer: W,
649 options: super::ndjson::NdjsonOptions,
650) -> Result<usize, JetroEngineError>
651where
652 P: AsRef<Path>,
653 W: Write,
654{
655 drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
656}
657
658pub fn run_ndjson_rev_matches_with_report<P, W>(
659 engine: &JetroEngine,
660 path: P,
661 predicate: &str,
662 limit: usize,
663 writer: W,
664) -> Result<NdjsonExecutionReport, JetroEngineError>
665where
666 P: AsRef<Path>,
667 W: Write,
668{
669 run_ndjson_rev_matches_with_report_and_options(
670 engine,
671 path,
672 predicate,
673 limit,
674 writer,
675 super::ndjson::NdjsonOptions::default(),
676 )
677}
678
679pub fn run_ndjson_rev_matches_with_report_and_options<P, W>(
680 engine: &JetroEngine,
681 path: P,
682 predicate: &str,
683 limit: usize,
684 writer: W,
685 options: super::ndjson::NdjsonOptions,
686) -> Result<NdjsonExecutionReport, JetroEngineError>
687where
688 P: AsRef<Path>,
689 W: Write,
690{
691 let (_, stats) =
692 drive_rev_matches_writer_with_stats(engine, path, predicate, limit, options, writer)?;
693 Ok(NdjsonExecutionReport::new(
694 NdjsonRouteExplain::matches(NdjsonSourceCaps::file(options)),
695 stats,
696 ))
697}
698fn drive_rev_writer_tape<P, W>(
699 engine: &JetroEngine,
700 path: P,
701 plan: &super::ndjson::NdjsonDirectTapePlan,
702 limit: Option<usize>,
703 options: super::ndjson::NdjsonOptions,
704 writer: W,
705) -> Result<usize, JetroEngineError>
706where
707 P: AsRef<Path>,
708 W: Write,
709{
710 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
711 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
712 let mut scratch =
713 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
714 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
715 let mut runner = super::ndjson::NdjsonTapeWriterRunner::new(engine, plan);
716 let mut count = 0usize;
717
718 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
719 out.clear();
720 scratch.parse_slice(&row).map_err(|message| {
721 super::ndjson::row_parse_error(
722 reverse_row_no,
723 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
724 )
725 })?;
726 runner.write_row(&scratch, &mut out)?;
727 if super::ndjson::write_json_bytes_line_with_options(&mut writer, &out, options)? {
728 count += 1;
729 }
730 if limit.is_some_and(|limit| count >= limit) {
731 break;
732 }
733 }
734
735 writer.flush()?;
736 Ok(count)
737}
738
739fn drive_rev<P, F>(
740 engine: &JetroEngine,
741 path: P,
742 query: &str,
743 options: super::ndjson::NdjsonOptions,
744 mut emit: F,
745) -> Result<usize, JetroEngineError>
746where
747 P: AsRef<Path>,
748 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
749{
750 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
751 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
752 let mut count = 0usize;
753
754 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
755 let out = executor.eval_owned_row(reverse_row_no, row)?;
756 count += 1;
757 if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
758 break;
759 }
760 }
761
762 Ok(count)
763}
764
765fn drive_rev_matches<P, F>(
766 engine: &JetroEngine,
767 path: P,
768 predicate: &str,
769 limit: usize,
770 options: super::ndjson::NdjsonOptions,
771 mut emit: F,
772) -> Result<usize, JetroEngineError>
773where
774 P: AsRef<Path>,
775 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
776{
777 if limit == 0 {
778 return Ok(0);
779 }
780
781 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
782 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
783 let mut emitted = 0usize;
784
785 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
786 let document = executor.parse_owned_row(reverse_row_no, row)?;
787 let matched = executor.eval_document(reverse_row_no, &document)?;
788 if !is_truthy(&matched) {
789 continue;
790 }
791
792 let root = document
793 .root_val_with(executor.engine().keys())
794 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
795 emitted += 1;
796 if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
797 break;
798 }
799 }
800
801 Ok(emitted)
802}
803
804fn drive_rev_matches_writer<P, W>(
805 engine: &JetroEngine,
806 path: P,
807 predicate: &str,
808 limit: usize,
809 options: super::ndjson::NdjsonOptions,
810 writer: W,
811) -> Result<usize, JetroEngineError>
812where
813 P: AsRef<Path>,
814 W: Write,
815{
816 Ok(drive_rev_matches_writer_with_stats(engine, path, predicate, limit, options, writer)?.0)
817}
818
819fn drive_rev_matches_writer_with_stats<P, W>(
820 engine: &JetroEngine,
821 path: P,
822 predicate: &str,
823 limit: usize,
824 options: super::ndjson::NdjsonOptions,
825 writer: W,
826) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
827where
828 P: AsRef<Path>,
829 W: Write,
830{
831 if limit == 0 {
832 return Ok((0, NdjsonExecutionStats::default()));
833 }
834 if let Some(predicate) = super::ndjson::direct_tape_predicate(engine, predicate) {
835 return drive_rev_matches_writer_tape_with_stats(
836 engine, path, &predicate, limit, options, writer,
837 );
838 }
839
840 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
841 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
842 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
843 let mut emitted = 0usize;
844 let mut stats = NdjsonExecutionStats::default();
845
846 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
847 stats.rows_scanned += 1;
848 let document = executor.parse_owned_row(reverse_row_no, row)?;
849 let matched = executor.eval_document(reverse_row_no, &document)?;
850 stats.fallback_filter_rows += 1;
851 if !is_truthy(&matched) {
852 stats.rows_filtered += 1;
853 continue;
854 }
855
856 super::ndjson::write_document_line(
857 &mut writer,
858 &document,
859 reverse_row_no,
860 executor.engine(),
861 )?;
862 emitted += 1;
863 stats.rows_emitted += 1;
864 if emitted >= limit {
865 break;
866 }
867 }
868
869 writer.flush()?;
870 Ok((emitted, stats))
871}
872fn drive_rev_matches_writer_tape_with_stats<P, W>(
873 engine: &JetroEngine,
874 path: P,
875 predicate: &super::ndjson::NdjsonDirectPredicate,
876 limit: usize,
877 options: super::ndjson::NdjsonOptions,
878 writer: W,
879) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
880where
881 P: AsRef<Path>,
882 W: Write,
883{
884 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
885 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
886 let mut scratch =
887 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
888 let mut emitted = 0usize;
889 let needs_vm = super::ndjson::predicate_needs_vm(predicate);
890 let mut vm = needs_vm.then(|| engine.lock_vm());
891 let env = needs_vm.then(|| crate::data::context::Env::new(crate::Val::Null));
892 let mut predicate_path = super::ndjson::NdjsonPathCache::default();
893 let mut stats = NdjsonExecutionStats::default();
894
895 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
896 stats.rows_scanned += 1;
897 scratch.parse_slice(&row).map_err(|message| {
898 super::ndjson::row_parse_error(
899 reverse_row_no,
900 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
901 )
902 })?;
903 if !super::ndjson::eval_tape_predicate(
904 &scratch,
905 predicate,
906 env.as_ref(),
907 &mut vm,
908 &mut predicate_path,
909 )
910 .map_err(JetroEngineError::Eval)?
911 {
912 stats.fallback_filter_rows += 1;
913 stats.rows_filtered += 1;
914 continue;
915 }
916 stats.fallback_filter_rows += 1;
917 writer.write_all(&row)?;
918 writer.write_all(b"\n")?;
919 emitted += 1;
920 stats.rows_emitted += 1;
921 if emitted >= limit {
922 break;
923 }
924 }
925
926 writer.flush()?;
927 Ok((emitted, stats))
928}
929
930fn trim_line_ending(buf: &mut Vec<u8>) {
931 while matches!(buf.last(), Some(b'\n' | b'\r')) {
932 buf.pop();
933 }
934}
935
936#[cfg(test)]
937mod tests {
938 use super::NdjsonReverseFileDriver;
939 use crate::JetroEngine;
940 use std::path::PathBuf;
941
942 #[test]
943 fn reverse_driver_reads_rows_from_tail() {
944 let path = temp_path("jetro-ndjson-rev-basic");
945 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
946 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
947
948 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
949 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
950 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
951 assert!(driver.next_line().unwrap().is_none());
952
953 let _ = std::fs::remove_file(path);
954 }
955
956 #[test]
957 fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
958 let path = temp_path("jetro-ndjson-rev-edge");
959 std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
960 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
961
962 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
963 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
964 assert!(driver.next_line().unwrap().is_none());
965
966 let _ = std::fs::remove_file(path);
967 }
968
969 #[test]
970 fn reverse_driver_reports_reverse_row_numbers() {
971 let path = temp_path("jetro-ndjson-rev-row-no");
972 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
973 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
974
975 assert_eq!(
976 driver.next_line_with_reverse_no().unwrap().unwrap(),
977 (1, br#"{"n":2}"#.to_vec())
978 );
979 assert_eq!(
980 driver.next_line_with_reverse_no().unwrap().unwrap(),
981 (2, br#"{"n":1}"#.to_vec())
982 );
983 assert!(driver.next_line_with_reverse_no().unwrap().is_none());
984
985 let _ = std::fs::remove_file(path);
986 }
987
988 #[test]
989 fn reverse_query_uses_direct_writer_shapes() {
990 let path = temp_path("jetro-ndjson-rev-direct");
991 std::fs::write(
992 &path,
993 b"{\"name\":\"ada\",\"attrs\":[{\"key\":\"a\",\"value\":1}]}\n{\"name\":\"bob\",\"attrs\":[{\"key\":\"b\",\"value\":2}]}\n",
994 )
995 .unwrap();
996 let engine = JetroEngine::new();
997 let mut out = Vec::new();
998
999 super::run_ndjson_rev(&engine, &path, "attrs.map([@.key, @.value])", &mut out).unwrap();
1000
1001 assert_eq!(
1002 String::from_utf8(out).unwrap(),
1003 "[[\"b\",2]]\n[[\"a\",1]]\n"
1004 );
1005 let _ = std::fs::remove_file(path);
1006 }
1007 #[test]
1008 fn direct_distinct_key_classifier_rejects_escaped_strings() {
1009 assert_eq!(
1010 super::raw_distinct_key_bytes(br#""plain""#).as_deref(),
1011 Some(br#""plain""#.as_slice())
1012 );
1013 assert_eq!(
1014 super::raw_distinct_key_bytes(br#""a\u0062""#).as_deref(),
1015 Some(br#""ab""#.as_slice())
1016 );
1017 assert_eq!(
1018 super::raw_distinct_key_bytes(br#"{"k":"v"}"#).as_deref(),
1019 Some(br#"{"k":"v"}"#.as_slice())
1020 );
1021 assert_eq!(
1022 super::raw_distinct_key_bytes(b"123").as_deref(),
1023 Some(b"123".as_slice())
1024 );
1025 assert_eq!(
1026 super::raw_distinct_key_bytes(br#"{"a" : 1,"b":"x\u0079"}"#).as_deref(),
1027 Some(br#"{"a":1,"b":"xy"}"#.as_slice())
1028 );
1029 assert_eq!(super::raw_distinct_key_bytes(b"1.0"), None);
1030 }
1031
1032 fn temp_path(name: &str) -> PathBuf {
1033 let mut path = std::env::temp_dir();
1034 path.push(format!("{}-{}.ndjson", name, std::process::id()));
1035 path
1036 }
1037}