1use super::ndjson_distinct::{
2 distinct_key_bytes, raw_distinct_key_bytes, AdaptiveDistinctKeys, DistinctFrontFilterKind,
3};
4use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
5use super::RowError;
6use crate::util::is_truthy;
7use crate::{JetroEngine, JetroEngineError};
8use memchr::memrchr;
9use serde_json::Value;
10use std::borrow::Cow;
11use std::collections::VecDeque;
12use std::fs::File;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16#[cfg(feature = "simd-json")]
17use super::ndjson_byte::{
18 raw_json_byte_path_value, tape_plan_can_write_byte_row, write_ndjson_byte_tape_plan_row,
19 BytePlanWrite, RawFieldValue,
20};
21
22pub struct NdjsonReverseFileDriver {
29 file: File,
30 pos: u64,
31 chunk_size: usize,
32 max_line_len: usize,
33 row_frame: NdjsonRowFrame,
34 carry: Vec<u8>,
35 pending: VecDeque<Vec<u8>>,
36 finished_head: bool,
37 reverse_line_no: u64,
38}
39
40impl NdjsonReverseFileDriver {
41 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
42 Self::with_options(path, super::ndjson::NdjsonOptions::default())
43 }
44
45 pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
46 Self::with_options(
47 path,
48 super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
49 )
50 }
51
52 pub fn with_options<P: AsRef<Path>>(
53 path: P,
54 options: super::ndjson::NdjsonOptions,
55 ) -> Result<Self, RowError> {
56 let mut file = File::open(path)?;
57 let pos = file.seek(SeekFrom::End(0))?;
58 Ok(Self {
59 file,
60 pos,
61 chunk_size: options.reverse_chunk_size.max(1),
62 max_line_len: options.max_line_len,
63 row_frame: options.row_frame,
64 carry: Vec::new(),
65 pending: VecDeque::new(),
66 finished_head: false,
67 reverse_line_no: 0,
68 })
69 }
70
71 pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
72 Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
73 }
74
75 pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
76 loop {
77 if let Some(mut line) = self.pending.pop_front() {
78 self.reverse_line_no += 1;
79 if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
80 return Ok(Some((self.reverse_line_no, line)));
81 }
82 continue;
83 }
84
85 if self.pos == 0 {
86 if self.finished_head || self.carry.is_empty() {
87 return Ok(None);
88 }
89 self.finished_head = true;
90 let mut line = std::mem::take(&mut self.carry);
91 trim_line_ending(&mut line);
92 self.check_line_len(line.len())?;
93 if line.iter().any(|b| !b.is_ascii_whitespace()) {
94 self.reverse_line_no += 1;
95 if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
96 return Ok(Some((self.reverse_line_no, line)));
97 }
98 }
99 return Ok(None);
100 }
101
102 let read_len = self.chunk_size.min(self.pos as usize);
103 self.pos -= read_len as u64;
104 let mut chunk = vec![0u8; read_len];
105 self.file.seek(SeekFrom::Start(self.pos))?;
106 self.file.read_exact(&mut chunk)?;
107
108 let mut end = chunk.len();
109 while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
110 let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
111 line.extend_from_slice(&chunk[nl + 1..end]);
112 line.extend_from_slice(&self.carry);
113 self.carry.clear();
114 end = nl;
115 trim_line_ending(&mut line);
116 self.check_line_len(line.len())?;
117 if line.iter().any(|b| !b.is_ascii_whitespace()) {
118 self.pending.push_back(line);
119 }
120 }
121
122 if end > 0 {
123 let mut next = Vec::with_capacity(end + self.carry.len());
124 next.extend_from_slice(&chunk[..end]);
125 next.extend_from_slice(&self.carry);
126 self.check_line_len(next.len())?;
127 self.carry = next;
128 }
129 }
130 }
131
132 fn frame_line(&self, line_no: u64, line: &mut Vec<u8>) -> Result<Option<Vec<u8>>, RowError> {
133 match frame_payload(self.row_frame, line_no, line)? {
134 FramePayload::Data(range) => {
135 if range.start > 0 || range.end < line.len() {
136 line.copy_within(range.clone(), 0);
137 line.truncate(range.end - range.start);
138 }
139 Ok(Some(std::mem::take(line)))
140 }
141 FramePayload::Skip => Ok(None),
142 }
143 }
144
145 fn check_line_len(&self, len: usize) -> Result<(), RowError> {
146 if len > self.max_line_len {
147 return Err(RowError::LineTooLarge {
148 line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
149 len,
150 max: self.max_line_len,
151 });
152 }
153 Ok(())
154 }
155}
156
157pub fn collect_ndjson_rev<P>(
158 engine: &JetroEngine,
159 path: P,
160 query: &str,
161) -> Result<Vec<Value>, JetroEngineError>
162where
163 P: AsRef<Path>,
164{
165 collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
166}
167
168pub fn collect_ndjson_rev_with_options<P>(
169 engine: &JetroEngine,
170 path: P,
171 query: &str,
172 options: super::ndjson::NdjsonOptions,
173) -> Result<Vec<Value>, JetroEngineError>
174where
175 P: AsRef<Path>,
176{
177 let mut values = Vec::new();
178 drive_rev(engine, path, query, options, |value| {
179 values.push(Value::from(value));
180 Ok(super::ndjson::NdjsonControl::Continue)
181 })?;
182 Ok(values)
183}
184
185pub fn for_each_ndjson_rev<P, F>(
186 engine: &JetroEngine,
187 path: P,
188 query: &str,
189 mut f: F,
190) -> Result<usize, JetroEngineError>
191where
192 P: AsRef<Path>,
193 F: FnMut(Value),
194{
195 for_each_ndjson_rev_with_options(
196 engine,
197 path,
198 query,
199 super::ndjson::NdjsonOptions::default(),
200 |value| {
201 f(value);
202 Ok(super::ndjson::NdjsonControl::Continue)
203 },
204 )
205}
206
207pub fn for_each_ndjson_rev_with_options<P, F>(
208 engine: &JetroEngine,
209 path: P,
210 query: &str,
211 options: super::ndjson::NdjsonOptions,
212 mut f: F,
213) -> Result<usize, JetroEngineError>
214where
215 P: AsRef<Path>,
216 F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
217{
218 drive_rev(engine, path, query, options, |value| f(Value::from(value)))
219}
220
221pub fn collect_ndjson_rev_matches<P>(
222 engine: &JetroEngine,
223 path: P,
224 predicate: &str,
225 limit: usize,
226) -> Result<Vec<Value>, JetroEngineError>
227where
228 P: AsRef<Path>,
229{
230 collect_ndjson_rev_matches_with_options(
231 engine,
232 path,
233 predicate,
234 limit,
235 super::ndjson::NdjsonOptions::default(),
236 )
237}
238
239pub fn collect_ndjson_rev_matches_with_options<P>(
240 engine: &JetroEngine,
241 path: P,
242 predicate: &str,
243 limit: usize,
244 options: super::ndjson::NdjsonOptions,
245) -> Result<Vec<Value>, JetroEngineError>
246where
247 P: AsRef<Path>,
248{
249 let mut values = Vec::with_capacity(limit);
250 drive_rev_matches(engine, path, predicate, limit, options, |value| {
251 values.push(Value::from(value));
252 Ok(super::ndjson::NdjsonControl::Continue)
253 })?;
254 Ok(values)
255}
256
257pub fn run_ndjson_rev<P, W>(
258 engine: &JetroEngine,
259 path: P,
260 query: &str,
261 writer: W,
262) -> Result<usize, JetroEngineError>
263where
264 P: AsRef<Path>,
265 W: Write,
266{
267 run_ndjson_rev_with_options(
268 engine,
269 path,
270 query,
271 writer,
272 super::ndjson::NdjsonOptions::default(),
273 )
274}
275
276pub fn run_ndjson_rev_with_options<P, W>(
277 engine: &JetroEngine,
278 path: P,
279 query: &str,
280 writer: W,
281 options: super::ndjson::NdjsonOptions,
282) -> Result<usize, JetroEngineError>
283where
284 P: AsRef<Path>,
285 W: Write,
286{
287 #[cfg(feature = "simd-json")]
288 if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
289 return drive_rev_writer_tape(engine, path, &plan, None, options, writer);
290 }
291
292 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
293 let mut emitted = 0usize;
294 drive_rev(engine, path, query, options, |value| {
295 if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
296 emitted += 1;
297 }
298 Ok(super::ndjson::NdjsonControl::Continue)
299 })?;
300 writer.flush()?;
301 Ok(emitted)
302}
303
304pub fn run_ndjson_rev_limit<P, W>(
305 engine: &JetroEngine,
306 path: P,
307 query: &str,
308 limit: usize,
309 writer: W,
310) -> Result<usize, JetroEngineError>
311where
312 P: AsRef<Path>,
313 W: Write,
314{
315 run_ndjson_rev_limit_with_options(
316 engine,
317 path,
318 query,
319 limit,
320 writer,
321 super::ndjson::NdjsonOptions::default(),
322 )
323}
324
325pub fn run_ndjson_rev_limit_with_options<P, W>(
326 engine: &JetroEngine,
327 path: P,
328 query: &str,
329 limit: usize,
330 writer: W,
331 options: super::ndjson::NdjsonOptions,
332) -> Result<usize, JetroEngineError>
333where
334 P: AsRef<Path>,
335 W: Write,
336{
337 if limit == 0 {
338 return Ok(0);
339 }
340
341 #[cfg(feature = "simd-json")]
342 if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
343 return drive_rev_writer_tape(engine, path, &plan, Some(limit), options, writer);
344 }
345
346 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
347 let mut emitted = 0usize;
348 drive_rev(engine, path, query, options, |value| {
349 let wrote = super::ndjson::write_val_line_with_options(&mut writer, &value, options)?;
350 if wrote {
351 emitted += 1;
352 }
353 Ok(if wrote && emitted >= limit {
354 super::ndjson::NdjsonControl::Stop
355 } else {
356 super::ndjson::NdjsonControl::Continue
357 })
358 })?;
359 writer.flush()?;
360 Ok(emitted)
361}
362
363pub fn run_ndjson_rev_distinct_by<P, W>(
364 engine: &JetroEngine,
365 path: P,
366 key_query: &str,
367 query: &str,
368 limit: usize,
369 writer: W,
370) -> Result<usize, JetroEngineError>
371where
372 P: AsRef<Path>,
373 W: Write,
374{
375 run_ndjson_rev_distinct_by_with_options(
376 engine,
377 path,
378 key_query,
379 query,
380 limit,
381 writer,
382 super::ndjson::NdjsonOptions::default(),
383 )
384}
385
386pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
387 engine: &JetroEngine,
388 path: P,
389 key_query: &str,
390 query: &str,
391 limit: usize,
392 writer: W,
393 options: super::ndjson::NdjsonOptions,
394) -> Result<usize, JetroEngineError>
395where
396 P: AsRef<Path>,
397 W: Write,
398{
399 run_ndjson_rev_distinct_by_with_stats_and_options(
400 engine, path, key_query, query, limit, writer, options,
401 )
402 .map(|stats| stats.emitted)
403}
404
405pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
406 engine: &JetroEngine,
407 path: P,
408 key_query: &str,
409 query: &str,
410 limit: usize,
411 writer: W,
412) -> Result<NdjsonRevDistinctStats, JetroEngineError>
413where
414 P: AsRef<Path>,
415 W: Write,
416{
417 run_ndjson_rev_distinct_by_with_stats_and_options(
418 engine,
419 path,
420 key_query,
421 query,
422 limit,
423 writer,
424 super::ndjson::NdjsonOptions::default(),
425 )
426}
427
428pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
429 engine: &JetroEngine,
430 path: P,
431 key_query: &str,
432 query: &str,
433 limit: usize,
434 writer: W,
435 options: super::ndjson::NdjsonOptions,
436) -> Result<NdjsonRevDistinctStats, JetroEngineError>
437where
438 P: AsRef<Path>,
439 W: Write,
440{
441 if limit == 0 {
442 return Ok(NdjsonRevDistinctStats::default());
443 }
444
445 #[cfg(feature = "simd-json")]
446 let direct_key_plan = super::ndjson::direct_tape_plan(engine, key_query);
447 #[cfg(feature = "simd-json")]
448 let direct_value_plan = super::ndjson::direct_tape_plan(engine, query)
449 .filter(|plan| tape_plan_can_write_byte_row(plan));
450
451 let mut key_plan = None;
452 let mut value_plan = None;
453 let mut vm = None;
454 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
455 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
456 #[cfg(feature = "simd-json")]
457 let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
458 #[cfg(feature = "simd-json")]
459 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
460 let mut seen = AdaptiveDistinctKeys::default();
461 let mut stats = NdjsonRevDistinctStats::default();
462
463 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
464 stats.rows_scanned += 1;
465 let mut row = Some(row);
466 let mut document = None;
467
468 #[cfg(feature = "simd-json")]
469 let direct_key = direct_key_plan.as_ref().and_then(|plan| {
470 row.as_deref()
471 .and_then(|row| distinct_key_direct(row, plan))
472 });
473 #[cfg(not(feature = "simd-json"))]
474 let direct_key = None;
475
476 let inserted = if let Some(key) = direct_key {
477 stats.direct_key_rows += 1;
478 match key {
479 Cow::Borrowed(key) => seen.insert_slice(key),
480 Cow::Owned(key) => seen.insert(key),
481 }
482 } else {
483 stats.fallback_key_rows += 1;
484 let parsed = super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?;
485 let plan = key_plan.get_or_insert_with(|| {
486 engine.cached_plan(key_query, crate::plan::physical::PlanningContext::bytes())
487 });
488 let vm = vm.get_or_insert_with(|| engine.lock_vm());
489 let key = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
490 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
491 let key = distinct_key_bytes(&key)?;
492 document = Some(parsed);
493 seen.insert(key)
494 };
495 if !inserted {
496 stats.duplicate_rows += 1;
497 continue;
498 }
499
500 #[cfg(feature = "simd-json")]
501 if let (Some(plan), Some(row)) = (direct_value_plan.as_ref(), row.as_deref()) {
502 byte_scratch.clear();
503 out.clear();
504 match write_ndjson_byte_tape_plan_row(&mut out, row, plan, &mut byte_scratch)? {
505 BytePlanWrite::Done => {
506 if super::ndjson::write_json_bytes_line_with_options(
507 &mut writer,
508 &out,
509 options,
510 )? {
511 stats.direct_value_rows += 1;
512 stats.emitted += 1;
513 }
514 if stats.emitted >= limit {
515 break;
516 }
517 continue;
518 }
519 BytePlanWrite::Fallback => {}
520 }
521 }
522
523 let parsed = match document {
524 Some(document) => document,
525 None => super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?,
526 };
527 let plan = value_plan.get_or_insert_with(|| {
528 engine.cached_plan(query, crate::plan::physical::PlanningContext::bytes())
529 });
530 let vm = vm.get_or_insert_with(|| engine.lock_vm());
531 let value = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
532 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
533 if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
534 stats.fallback_value_rows += 1;
535 stats.emitted += 1;
536 }
537 if stats.emitted >= limit {
538 break;
539 }
540 }
541
542 writer.flush()?;
543 stats.front_filter = seen.front_kind();
544 Ok(stats)
545}
546
547#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
548pub struct NdjsonRevDistinctStats {
549 pub rows_scanned: usize,
550 pub emitted: usize,
551 pub duplicate_rows: usize,
552 pub direct_key_rows: usize,
553 pub fallback_key_rows: usize,
554 pub direct_value_rows: usize,
555 pub fallback_value_rows: usize,
556 pub front_filter: DistinctFrontFilterKind,
557}
558
559#[cfg(feature = "simd-json")]
560fn distinct_key_direct<'a>(
561 row: &'a [u8],
562 plan: &super::ndjson::NdjsonDirectTapePlan,
563) -> Option<Cow<'a, [u8]>> {
564 const NULL_KEY: &[u8] = b"null";
565
566 let super::ndjson::NdjsonDirectTapePlan::RootPath(steps) = plan else {
567 return None;
568 };
569 match raw_json_byte_path_value(row, steps) {
570 RawFieldValue::Found(value) => raw_distinct_key_bytes(value),
571 RawFieldValue::Missing => Some(Cow::Borrowed(NULL_KEY)),
572 RawFieldValue::Fallback => None,
573 }
574}
575
576pub fn run_ndjson_rev_matches<P, W>(
577 engine: &JetroEngine,
578 path: P,
579 predicate: &str,
580 limit: usize,
581 writer: W,
582) -> Result<usize, JetroEngineError>
583where
584 P: AsRef<Path>,
585 W: Write,
586{
587 run_ndjson_rev_matches_with_options(
588 engine,
589 path,
590 predicate,
591 limit,
592 writer,
593 super::ndjson::NdjsonOptions::default(),
594 )
595}
596
597pub fn run_ndjson_rev_matches_with_options<P, W>(
598 engine: &JetroEngine,
599 path: P,
600 predicate: &str,
601 limit: usize,
602 writer: W,
603 options: super::ndjson::NdjsonOptions,
604) -> Result<usize, JetroEngineError>
605where
606 P: AsRef<Path>,
607 W: Write,
608{
609 drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
610}
611
612#[cfg(feature = "simd-json")]
613fn drive_rev_writer_tape<P, W>(
614 engine: &JetroEngine,
615 path: P,
616 plan: &super::ndjson::NdjsonDirectTapePlan,
617 limit: Option<usize>,
618 options: super::ndjson::NdjsonOptions,
619 writer: W,
620) -> Result<usize, JetroEngineError>
621where
622 P: AsRef<Path>,
623 W: Write,
624{
625 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
626 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
627 let mut scratch =
628 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
629 let mut out = Vec::with_capacity(options.initial_buffer_capacity);
630 let mut runner = super::ndjson::NdjsonTapeWriterRunner::new(engine, plan);
631 let mut count = 0usize;
632
633 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
634 out.clear();
635 scratch.parse_slice(&row).map_err(|message| {
636 super::ndjson::row_parse_error(
637 reverse_row_no,
638 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
639 )
640 })?;
641 runner.write_row(&scratch, &mut out)?;
642 if super::ndjson::write_json_bytes_line_with_options(&mut writer, &out, options)? {
643 count += 1;
644 }
645 if limit.is_some_and(|limit| count >= limit) {
646 break;
647 }
648 }
649
650 writer.flush()?;
651 Ok(count)
652}
653
654fn drive_rev<P, F>(
655 engine: &JetroEngine,
656 path: P,
657 query: &str,
658 options: super::ndjson::NdjsonOptions,
659 mut emit: F,
660) -> Result<usize, JetroEngineError>
661where
662 P: AsRef<Path>,
663 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
664{
665 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
666 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
667 let mut count = 0usize;
668
669 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
670 let out = executor.eval_owned_row(reverse_row_no, row)?;
671 count += 1;
672 if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
673 break;
674 }
675 }
676
677 Ok(count)
678}
679
680fn drive_rev_matches<P, F>(
681 engine: &JetroEngine,
682 path: P,
683 predicate: &str,
684 limit: usize,
685 options: super::ndjson::NdjsonOptions,
686 mut emit: F,
687) -> Result<usize, JetroEngineError>
688where
689 P: AsRef<Path>,
690 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
691{
692 if limit == 0 {
693 return Ok(0);
694 }
695
696 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
697 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
698 let mut emitted = 0usize;
699
700 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
701 let document = executor.parse_owned_row(reverse_row_no, row)?;
702 let matched = executor.eval_document(reverse_row_no, &document)?;
703 if !is_truthy(&matched) {
704 continue;
705 }
706
707 let root = document
708 .root_val_with(executor.engine().keys())
709 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
710 emitted += 1;
711 if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
712 break;
713 }
714 }
715
716 Ok(emitted)
717}
718
719fn drive_rev_matches_writer<P, W>(
720 engine: &JetroEngine,
721 path: P,
722 predicate: &str,
723 limit: usize,
724 options: super::ndjson::NdjsonOptions,
725 writer: W,
726) -> Result<usize, JetroEngineError>
727where
728 P: AsRef<Path>,
729 W: Write,
730{
731 if limit == 0 {
732 return Ok(0);
733 }
734
735 #[cfg(feature = "simd-json")]
736 if let Some(predicate) = super::ndjson::direct_tape_predicate(engine, predicate) {
737 return drive_rev_matches_writer_tape(engine, path, &predicate, limit, options, writer);
738 }
739
740 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
741 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
742 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
743 let mut emitted = 0usize;
744
745 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
746 let document = executor.parse_owned_row(reverse_row_no, row)?;
747 let matched = executor.eval_document(reverse_row_no, &document)?;
748 if !is_truthy(&matched) {
749 continue;
750 }
751
752 super::ndjson::write_document_line(
753 &mut writer,
754 &document,
755 reverse_row_no,
756 executor.engine(),
757 )?;
758 emitted += 1;
759 if emitted >= limit {
760 break;
761 }
762 }
763
764 writer.flush()?;
765 Ok(emitted)
766}
767
768#[cfg(feature = "simd-json")]
769fn drive_rev_matches_writer_tape<P, W>(
770 engine: &JetroEngine,
771 path: P,
772 predicate: &super::ndjson::NdjsonDirectPredicate,
773 limit: usize,
774 options: super::ndjson::NdjsonOptions,
775 writer: W,
776) -> Result<usize, JetroEngineError>
777where
778 P: AsRef<Path>,
779 W: Write,
780{
781 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
782 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
783 let mut scratch =
784 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
785 let mut emitted = 0usize;
786 let needs_vm = super::ndjson::predicate_needs_vm(predicate);
787 let mut vm = needs_vm.then(|| engine.lock_vm());
788 let env = needs_vm.then(|| crate::data::context::Env::new(crate::Val::Null));
789 let mut predicate_path = super::ndjson::NdjsonPathCache::default();
790
791 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
792 scratch.parse_slice(&row).map_err(|message| {
793 super::ndjson::row_parse_error(
794 reverse_row_no,
795 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
796 )
797 })?;
798 if !super::ndjson::eval_tape_predicate(
799 &scratch,
800 predicate,
801 env.as_ref(),
802 &mut vm,
803 &mut predicate_path,
804 )
805 .map_err(JetroEngineError::Eval)?
806 {
807 continue;
808 }
809 writer.write_all(&row)?;
810 writer.write_all(b"\n")?;
811 emitted += 1;
812 if emitted >= limit {
813 break;
814 }
815 }
816
817 writer.flush()?;
818 Ok(emitted)
819}
820
821fn trim_line_ending(buf: &mut Vec<u8>) {
822 while matches!(buf.last(), Some(b'\n' | b'\r')) {
823 buf.pop();
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::NdjsonReverseFileDriver;
830 use crate::JetroEngine;
831 use std::path::PathBuf;
832
833 #[test]
834 fn reverse_driver_reads_rows_from_tail() {
835 let path = temp_path("jetro-ndjson-rev-basic");
836 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
837 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
838
839 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
840 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
841 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
842 assert!(driver.next_line().unwrap().is_none());
843
844 let _ = std::fs::remove_file(path);
845 }
846
847 #[test]
848 fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
849 let path = temp_path("jetro-ndjson-rev-edge");
850 std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
851 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
852
853 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
854 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
855 assert!(driver.next_line().unwrap().is_none());
856
857 let _ = std::fs::remove_file(path);
858 }
859
860 #[test]
861 fn reverse_driver_reports_reverse_row_numbers() {
862 let path = temp_path("jetro-ndjson-rev-row-no");
863 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
864 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
865
866 assert_eq!(
867 driver.next_line_with_reverse_no().unwrap().unwrap(),
868 (1, br#"{"n":2}"#.to_vec())
869 );
870 assert_eq!(
871 driver.next_line_with_reverse_no().unwrap().unwrap(),
872 (2, br#"{"n":1}"#.to_vec())
873 );
874 assert!(driver.next_line_with_reverse_no().unwrap().is_none());
875
876 let _ = std::fs::remove_file(path);
877 }
878
879 #[test]
880 fn reverse_query_uses_direct_writer_shapes() {
881 let path = temp_path("jetro-ndjson-rev-direct");
882 std::fs::write(
883 &path,
884 b"{\"name\":\"ada\",\"attrs\":[{\"key\":\"a\",\"value\":1}]}\n{\"name\":\"bob\",\"attrs\":[{\"key\":\"b\",\"value\":2}]}\n",
885 )
886 .unwrap();
887 let engine = JetroEngine::new();
888 let mut out = Vec::new();
889
890 super::run_ndjson_rev(&engine, &path, "attrs.map([@.key, @.value])", &mut out).unwrap();
891
892 assert_eq!(
893 String::from_utf8(out).unwrap(),
894 "[[\"b\",2]]\n[[\"a\",1]]\n"
895 );
896 let _ = std::fs::remove_file(path);
897 }
898
899 #[cfg(feature = "simd-json")]
900 #[test]
901 fn direct_distinct_key_classifier_rejects_escaped_strings() {
902 assert_eq!(
903 super::raw_distinct_key_bytes(br#""plain""#).as_deref(),
904 Some(br#""plain""#.as_slice())
905 );
906 assert_eq!(
907 super::raw_distinct_key_bytes(br#""a\u0062""#).as_deref(),
908 Some(br#""ab""#.as_slice())
909 );
910 assert_eq!(
911 super::raw_distinct_key_bytes(br#"{"k":"v"}"#).as_deref(),
912 Some(br#"{"k":"v"}"#.as_slice())
913 );
914 assert_eq!(
915 super::raw_distinct_key_bytes(b"123").as_deref(),
916 Some(b"123".as_slice())
917 );
918 assert_eq!(
919 super::raw_distinct_key_bytes(br#"{"a" : 1,"b":"x\u0079"}"#).as_deref(),
920 Some(br#"{"a":1,"b":"xy"}"#.as_slice())
921 );
922 assert_eq!(super::raw_distinct_key_bytes(b"1.0"), None);
923 }
924
925 fn temp_path(name: &str) -> PathBuf {
926 let mut path = std::env::temp_dir();
927 path.push(format!("{}-{}.ndjson", name, std::process::id()));
928 path
929 }
930}