1use super::RowError;
2use crate::util::is_truthy;
3use crate::{JetroEngine, JetroEngineError};
4use memchr::memrchr;
5use serde_json::Value;
6use std::collections::VecDeque;
7use std::fs::File;
8use std::io::{Read, Seek, SeekFrom, Write};
9use std::path::Path;
10
11pub struct NdjsonReverseFileDriver {
18 file: File,
19 pos: u64,
20 chunk_size: usize,
21 max_line_len: usize,
22 carry: Vec<u8>,
23 pending: VecDeque<Vec<u8>>,
24 finished_head: bool,
25 reverse_line_no: u64,
26}
27
28impl NdjsonReverseFileDriver {
29 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
30 Self::with_options(path, super::ndjson::NdjsonOptions::default())
31 }
32
33 pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
34 Self::with_options(
35 path,
36 super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
37 )
38 }
39
40 pub fn with_options<P: AsRef<Path>>(
41 path: P,
42 options: super::ndjson::NdjsonOptions,
43 ) -> Result<Self, RowError> {
44 let mut file = File::open(path)?;
45 let pos = file.seek(SeekFrom::End(0))?;
46 Ok(Self {
47 file,
48 pos,
49 chunk_size: options.reverse_chunk_size.max(1),
50 max_line_len: options.max_line_len,
51 carry: Vec::new(),
52 pending: VecDeque::new(),
53 finished_head: false,
54 reverse_line_no: 0,
55 })
56 }
57
58 pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
59 Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
60 }
61
62 pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
63 loop {
64 if let Some(line) = self.pending.pop_front() {
65 self.reverse_line_no += 1;
66 return Ok(Some((self.reverse_line_no, line)));
67 }
68
69 if self.pos == 0 {
70 if self.finished_head || self.carry.is_empty() {
71 return Ok(None);
72 }
73 self.finished_head = true;
74 let mut line = std::mem::take(&mut self.carry);
75 trim_line_ending(&mut line);
76 self.check_line_len(line.len())?;
77 if line.iter().any(|b| !b.is_ascii_whitespace()) {
78 self.reverse_line_no += 1;
79 return Ok(Some((self.reverse_line_no, line)));
80 }
81 return Ok(None);
82 }
83
84 let read_len = self.chunk_size.min(self.pos as usize);
85 self.pos -= read_len as u64;
86 let mut chunk = vec![0u8; read_len];
87 self.file.seek(SeekFrom::Start(self.pos))?;
88 self.file.read_exact(&mut chunk)?;
89
90 let mut end = chunk.len();
91 while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
92 let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
93 line.extend_from_slice(&chunk[nl + 1..end]);
94 line.extend_from_slice(&self.carry);
95 self.carry.clear();
96 end = nl;
97 trim_line_ending(&mut line);
98 self.check_line_len(line.len())?;
99 if line.iter().any(|b| !b.is_ascii_whitespace()) {
100 self.pending.push_back(line);
101 }
102 }
103
104 if end > 0 {
105 let mut next = Vec::with_capacity(end + self.carry.len());
106 next.extend_from_slice(&chunk[..end]);
107 next.extend_from_slice(&self.carry);
108 self.check_line_len(next.len())?;
109 self.carry = next;
110 }
111 }
112 }
113
114 fn check_line_len(&self, len: usize) -> Result<(), RowError> {
115 if len > self.max_line_len {
116 return Err(RowError::LineTooLarge {
117 line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
118 len,
119 max: self.max_line_len,
120 });
121 }
122 Ok(())
123 }
124}
125
126pub fn collect_ndjson_rev<P>(
127 engine: &JetroEngine,
128 path: P,
129 query: &str,
130) -> Result<Vec<Value>, JetroEngineError>
131where
132 P: AsRef<Path>,
133{
134 collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
135}
136
137pub fn collect_ndjson_rev_with_options<P>(
138 engine: &JetroEngine,
139 path: P,
140 query: &str,
141 options: super::ndjson::NdjsonOptions,
142) -> Result<Vec<Value>, JetroEngineError>
143where
144 P: AsRef<Path>,
145{
146 let mut values = Vec::new();
147 drive_rev(engine, path, query, options, |value| {
148 values.push(Value::from(value));
149 Ok(super::ndjson::NdjsonControl::Continue)
150 })?;
151 Ok(values)
152}
153
154pub fn for_each_ndjson_rev<P, F>(
155 engine: &JetroEngine,
156 path: P,
157 query: &str,
158 mut f: F,
159) -> Result<usize, JetroEngineError>
160where
161 P: AsRef<Path>,
162 F: FnMut(Value),
163{
164 for_each_ndjson_rev_with_options(
165 engine,
166 path,
167 query,
168 super::ndjson::NdjsonOptions::default(),
169 |value| {
170 f(value);
171 Ok(super::ndjson::NdjsonControl::Continue)
172 },
173 )
174}
175
176pub fn for_each_ndjson_rev_with_options<P, F>(
177 engine: &JetroEngine,
178 path: P,
179 query: &str,
180 options: super::ndjson::NdjsonOptions,
181 mut f: F,
182) -> Result<usize, JetroEngineError>
183where
184 P: AsRef<Path>,
185 F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
186{
187 drive_rev(engine, path, query, options, |value| f(Value::from(value)))
188}
189
190pub fn collect_ndjson_rev_matches<P>(
191 engine: &JetroEngine,
192 path: P,
193 predicate: &str,
194 limit: usize,
195) -> Result<Vec<Value>, JetroEngineError>
196where
197 P: AsRef<Path>,
198{
199 collect_ndjson_rev_matches_with_options(
200 engine,
201 path,
202 predicate,
203 limit,
204 super::ndjson::NdjsonOptions::default(),
205 )
206}
207
208pub fn collect_ndjson_rev_matches_with_options<P>(
209 engine: &JetroEngine,
210 path: P,
211 predicate: &str,
212 limit: usize,
213 options: super::ndjson::NdjsonOptions,
214) -> Result<Vec<Value>, JetroEngineError>
215where
216 P: AsRef<Path>,
217{
218 let mut values = Vec::with_capacity(limit);
219 drive_rev_matches(engine, path, predicate, limit, options, |value| {
220 values.push(Value::from(value));
221 Ok(super::ndjson::NdjsonControl::Continue)
222 })?;
223 Ok(values)
224}
225
226pub fn run_ndjson_rev<P, W>(
227 engine: &JetroEngine,
228 path: P,
229 query: &str,
230 writer: W,
231) -> Result<usize, JetroEngineError>
232where
233 P: AsRef<Path>,
234 W: Write,
235{
236 run_ndjson_rev_with_options(
237 engine,
238 path,
239 query,
240 writer,
241 super::ndjson::NdjsonOptions::default(),
242 )
243}
244
245pub fn run_ndjson_rev_with_options<P, W>(
246 engine: &JetroEngine,
247 path: P,
248 query: &str,
249 writer: W,
250 options: super::ndjson::NdjsonOptions,
251) -> Result<usize, JetroEngineError>
252where
253 P: AsRef<Path>,
254 W: Write,
255{
256 #[cfg(feature = "simd-json")]
257 if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
258 return drive_rev_writer_tape(engine, path, &plan, None, options, writer);
259 }
260
261 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
262 let count = drive_rev(engine, path, query, options, |value| {
263 super::ndjson::write_val_line(&mut writer, &value)?;
264 Ok(super::ndjson::NdjsonControl::Continue)
265 })?;
266 writer.flush()?;
267 Ok(count)
268}
269
270pub fn run_ndjson_rev_limit<P, W>(
271 engine: &JetroEngine,
272 path: P,
273 query: &str,
274 limit: usize,
275 writer: W,
276) -> Result<usize, JetroEngineError>
277where
278 P: AsRef<Path>,
279 W: Write,
280{
281 run_ndjson_rev_limit_with_options(
282 engine,
283 path,
284 query,
285 limit,
286 writer,
287 super::ndjson::NdjsonOptions::default(),
288 )
289}
290
291pub fn run_ndjson_rev_limit_with_options<P, W>(
292 engine: &JetroEngine,
293 path: P,
294 query: &str,
295 limit: usize,
296 writer: W,
297 options: super::ndjson::NdjsonOptions,
298) -> Result<usize, JetroEngineError>
299where
300 P: AsRef<Path>,
301 W: Write,
302{
303 if limit == 0 {
304 return Ok(0);
305 }
306
307 #[cfg(feature = "simd-json")]
308 if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
309 return drive_rev_writer_tape(engine, path, &plan, Some(limit), options, writer);
310 }
311
312 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
313 let mut emitted = 0usize;
314 let count = drive_rev(engine, path, query, options, |value| {
315 super::ndjson::write_val_line(&mut writer, &value)?;
316 emitted += 1;
317 Ok(if emitted >= limit {
318 super::ndjson::NdjsonControl::Stop
319 } else {
320 super::ndjson::NdjsonControl::Continue
321 })
322 })?;
323 writer.flush()?;
324 Ok(count)
325}
326
327pub fn run_ndjson_rev_matches<P, W>(
328 engine: &JetroEngine,
329 path: P,
330 predicate: &str,
331 limit: usize,
332 writer: W,
333) -> Result<usize, JetroEngineError>
334where
335 P: AsRef<Path>,
336 W: Write,
337{
338 run_ndjson_rev_matches_with_options(
339 engine,
340 path,
341 predicate,
342 limit,
343 writer,
344 super::ndjson::NdjsonOptions::default(),
345 )
346}
347
348pub fn run_ndjson_rev_matches_with_options<P, W>(
349 engine: &JetroEngine,
350 path: P,
351 predicate: &str,
352 limit: usize,
353 writer: W,
354 options: super::ndjson::NdjsonOptions,
355) -> Result<usize, JetroEngineError>
356where
357 P: AsRef<Path>,
358 W: Write,
359{
360 drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
361}
362
363#[cfg(feature = "simd-json")]
364fn drive_rev_writer_tape<P, W>(
365 engine: &JetroEngine,
366 path: P,
367 plan: &super::ndjson::NdjsonDirectTapePlan,
368 limit: Option<usize>,
369 options: super::ndjson::NdjsonOptions,
370 writer: W,
371) -> Result<usize, JetroEngineError>
372where
373 P: AsRef<Path>,
374 W: Write,
375{
376 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
377 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
378 let mut scratch =
379 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
380 let mut runner = super::ndjson::NdjsonTapeWriterRunner::new(engine, plan);
381 let mut count = 0usize;
382
383 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
384 scratch.parse_slice(&row).map_err(|message| {
385 super::ndjson::row_parse_error(
386 reverse_row_no,
387 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
388 )
389 })?;
390 runner.write_row(&scratch, &mut writer)?;
391 writer.write_all(b"\n")?;
392 count += 1;
393 if limit.is_some_and(|limit| count >= limit) {
394 break;
395 }
396 }
397
398 writer.flush()?;
399 Ok(count)
400}
401
402fn drive_rev<P, F>(
403 engine: &JetroEngine,
404 path: P,
405 query: &str,
406 options: super::ndjson::NdjsonOptions,
407 mut emit: F,
408) -> Result<usize, JetroEngineError>
409where
410 P: AsRef<Path>,
411 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
412{
413 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
414 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
415 let mut count = 0usize;
416
417 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
418 let out = executor.eval_owned_row(reverse_row_no, row)?;
419 count += 1;
420 if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
421 break;
422 }
423 }
424
425 Ok(count)
426}
427
428fn drive_rev_matches<P, F>(
429 engine: &JetroEngine,
430 path: P,
431 predicate: &str,
432 limit: usize,
433 options: super::ndjson::NdjsonOptions,
434 mut emit: F,
435) -> Result<usize, JetroEngineError>
436where
437 P: AsRef<Path>,
438 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
439{
440 if limit == 0 {
441 return Ok(0);
442 }
443
444 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
445 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
446 let mut emitted = 0usize;
447
448 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
449 let document = executor.parse_owned_row(reverse_row_no, row)?;
450 let matched = executor.eval_document(reverse_row_no, &document)?;
451 if !is_truthy(&matched) {
452 continue;
453 }
454
455 let root = document
456 .root_val_with(executor.engine().keys())
457 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
458 emitted += 1;
459 if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
460 break;
461 }
462 }
463
464 Ok(emitted)
465}
466
467fn drive_rev_matches_writer<P, W>(
468 engine: &JetroEngine,
469 path: P,
470 predicate: &str,
471 limit: usize,
472 options: super::ndjson::NdjsonOptions,
473 writer: W,
474) -> Result<usize, JetroEngineError>
475where
476 P: AsRef<Path>,
477 W: Write,
478{
479 if limit == 0 {
480 return Ok(0);
481 }
482
483 #[cfg(feature = "simd-json")]
484 if let Some(predicate) = super::ndjson::direct_tape_predicate(engine, predicate) {
485 return drive_rev_matches_writer_tape(engine, path, &predicate, limit, options, writer);
486 }
487
488 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
489 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
490 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
491 let mut emitted = 0usize;
492
493 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
494 let document = executor.parse_owned_row(reverse_row_no, row)?;
495 let matched = executor.eval_document(reverse_row_no, &document)?;
496 if !is_truthy(&matched) {
497 continue;
498 }
499
500 super::ndjson::write_document_line(
501 &mut writer,
502 &document,
503 reverse_row_no,
504 executor.engine(),
505 )?;
506 emitted += 1;
507 if emitted >= limit {
508 break;
509 }
510 }
511
512 writer.flush()?;
513 Ok(emitted)
514}
515
516#[cfg(feature = "simd-json")]
517fn drive_rev_matches_writer_tape<P, W>(
518 engine: &JetroEngine,
519 path: P,
520 predicate: &super::ndjson::NdjsonDirectPredicate,
521 limit: usize,
522 options: super::ndjson::NdjsonOptions,
523 writer: W,
524) -> Result<usize, JetroEngineError>
525where
526 P: AsRef<Path>,
527 W: Write,
528{
529 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
530 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
531 let mut scratch =
532 crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
533 let mut emitted = 0usize;
534 let needs_vm = super::ndjson::predicate_needs_vm(predicate);
535 let mut vm = needs_vm.then(|| engine.lock_vm());
536 let env = needs_vm.then(|| crate::data::context::Env::new(crate::Val::Null));
537 let mut predicate_path = super::ndjson::NdjsonPathCache::default();
538
539 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
540 scratch.parse_slice(&row).map_err(|message| {
541 super::ndjson::row_parse_error(
542 reverse_row_no,
543 JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
544 )
545 })?;
546 if !super::ndjson::eval_tape_predicate(
547 &scratch,
548 predicate,
549 env.as_ref(),
550 &mut vm,
551 &mut predicate_path,
552 )
553 .map_err(JetroEngineError::Eval)?
554 {
555 continue;
556 }
557 writer.write_all(&row)?;
558 writer.write_all(b"\n")?;
559 emitted += 1;
560 if emitted >= limit {
561 break;
562 }
563 }
564
565 writer.flush()?;
566 Ok(emitted)
567}
568
569fn trim_line_ending(buf: &mut Vec<u8>) {
570 while matches!(buf.last(), Some(b'\n' | b'\r')) {
571 buf.pop();
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::NdjsonReverseFileDriver;
578 use crate::JetroEngine;
579 use std::path::PathBuf;
580
581 #[test]
582 fn reverse_driver_reads_rows_from_tail() {
583 let path = temp_path("jetro-ndjson-rev-basic");
584 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
585 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
586
587 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
588 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
589 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
590 assert!(driver.next_line().unwrap().is_none());
591
592 let _ = std::fs::remove_file(path);
593 }
594
595 #[test]
596 fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
597 let path = temp_path("jetro-ndjson-rev-edge");
598 std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
599 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
600
601 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
602 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
603 assert!(driver.next_line().unwrap().is_none());
604
605 let _ = std::fs::remove_file(path);
606 }
607
608 #[test]
609 fn reverse_driver_reports_reverse_row_numbers() {
610 let path = temp_path("jetro-ndjson-rev-row-no");
611 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
612 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
613
614 assert_eq!(
615 driver.next_line_with_reverse_no().unwrap().unwrap(),
616 (1, br#"{"n":2}"#.to_vec())
617 );
618 assert_eq!(
619 driver.next_line_with_reverse_no().unwrap().unwrap(),
620 (2, br#"{"n":1}"#.to_vec())
621 );
622 assert!(driver.next_line_with_reverse_no().unwrap().is_none());
623
624 let _ = std::fs::remove_file(path);
625 }
626
627 #[test]
628 fn reverse_query_uses_direct_writer_shapes() {
629 let path = temp_path("jetro-ndjson-rev-direct");
630 std::fs::write(
631 &path,
632 b"{\"name\":\"ada\",\"attrs\":[{\"key\":\"a\",\"value\":1}]}\n{\"name\":\"bob\",\"attrs\":[{\"key\":\"b\",\"value\":2}]}\n",
633 )
634 .unwrap();
635 let engine = JetroEngine::new();
636 let mut out = Vec::new();
637
638 super::run_ndjson_rev(&engine, &path, "attrs.map([@.key, @.value])", &mut out).unwrap();
639
640 assert_eq!(
641 String::from_utf8(out).unwrap(),
642 "[[\"b\",2]]\n[[\"a\",1]]\n"
643 );
644 let _ = std::fs::remove_file(path);
645 }
646
647 fn temp_path(name: &str) -> PathBuf {
648 let mut path = std::env::temp_dir();
649 path.push(format!("{}-{}.ndjson", name, std::process::id()));
650 path
651 }
652}