Skip to main content

jetro_core/io/
ndjson_rev.rs

1use super::RowError;
2use crate::util::is_truthy;
3use crate::{JetroEngine, JetroEngineError};
4use memchr::memrchr;
5use serde_json::Value;
6use std::collections::VecDeque;
7use std::fs::File;
8use std::io::{Read, Seek, SeekFrom, Write};
9use std::path::Path;
10
11/// Reverse NDJSON line reader over a seekable file.
12///
13/// The reader scans fixed-size chunks from EOF to BOF and returns owned line
14/// bytes in reverse physical order. It keeps only the current chunk and one
15/// cross-chunk carry buffer, so memory stays bounded by the longest row plus
16/// the configured chunk size.
17pub struct NdjsonReverseFileDriver {
18    file: File,
19    pos: u64,
20    chunk_size: usize,
21    max_line_len: usize,
22    carry: Vec<u8>,
23    pending: VecDeque<Vec<u8>>,
24    finished_head: bool,
25    reverse_line_no: u64,
26}
27
28impl NdjsonReverseFileDriver {
29    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
30        Self::with_options(path, super::ndjson::NdjsonOptions::default())
31    }
32
33    pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
34        Self::with_options(
35            path,
36            super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
37        )
38    }
39
40    pub fn with_options<P: AsRef<Path>>(
41        path: P,
42        options: super::ndjson::NdjsonOptions,
43    ) -> Result<Self, RowError> {
44        let mut file = File::open(path)?;
45        let pos = file.seek(SeekFrom::End(0))?;
46        Ok(Self {
47            file,
48            pos,
49            chunk_size: options.reverse_chunk_size.max(1),
50            max_line_len: options.max_line_len,
51            carry: Vec::new(),
52            pending: VecDeque::new(),
53            finished_head: false,
54            reverse_line_no: 0,
55        })
56    }
57
58    pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
59        Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
60    }
61
62    pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
63        loop {
64            if let Some(line) = self.pending.pop_front() {
65                self.reverse_line_no += 1;
66                return Ok(Some((self.reverse_line_no, line)));
67            }
68
69            if self.pos == 0 {
70                if self.finished_head || self.carry.is_empty() {
71                    return Ok(None);
72                }
73                self.finished_head = true;
74                let mut line = std::mem::take(&mut self.carry);
75                trim_line_ending(&mut line);
76                self.check_line_len(line.len())?;
77                if line.iter().any(|b| !b.is_ascii_whitespace()) {
78                    self.reverse_line_no += 1;
79                    return Ok(Some((self.reverse_line_no, line)));
80                }
81                return Ok(None);
82            }
83
84            let read_len = self.chunk_size.min(self.pos as usize);
85            self.pos -= read_len as u64;
86            let mut chunk = vec![0u8; read_len];
87            self.file.seek(SeekFrom::Start(self.pos))?;
88            self.file.read_exact(&mut chunk)?;
89
90            let mut end = chunk.len();
91            while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
92                let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
93                line.extend_from_slice(&chunk[nl + 1..end]);
94                line.extend_from_slice(&self.carry);
95                self.carry.clear();
96                end = nl;
97                trim_line_ending(&mut line);
98                self.check_line_len(line.len())?;
99                if line.iter().any(|b| !b.is_ascii_whitespace()) {
100                    self.pending.push_back(line);
101                }
102            }
103
104            if end > 0 {
105                let mut next = Vec::with_capacity(end + self.carry.len());
106                next.extend_from_slice(&chunk[..end]);
107                next.extend_from_slice(&self.carry);
108                self.check_line_len(next.len())?;
109                self.carry = next;
110            }
111        }
112    }
113
114    fn check_line_len(&self, len: usize) -> Result<(), RowError> {
115        if len > self.max_line_len {
116            return Err(RowError::LineTooLarge {
117                line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
118                len,
119                max: self.max_line_len,
120            });
121        }
122        Ok(())
123    }
124}
125
126pub fn collect_ndjson_rev<P>(
127    engine: &JetroEngine,
128    path: P,
129    query: &str,
130) -> Result<Vec<Value>, JetroEngineError>
131where
132    P: AsRef<Path>,
133{
134    collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
135}
136
137pub fn collect_ndjson_rev_with_options<P>(
138    engine: &JetroEngine,
139    path: P,
140    query: &str,
141    options: super::ndjson::NdjsonOptions,
142) -> Result<Vec<Value>, JetroEngineError>
143where
144    P: AsRef<Path>,
145{
146    let mut values = Vec::new();
147    drive_rev(engine, path, query, options, |value| {
148        values.push(Value::from(value));
149        Ok(super::ndjson::NdjsonControl::Continue)
150    })?;
151    Ok(values)
152}
153
154pub fn for_each_ndjson_rev<P, F>(
155    engine: &JetroEngine,
156    path: P,
157    query: &str,
158    mut f: F,
159) -> Result<usize, JetroEngineError>
160where
161    P: AsRef<Path>,
162    F: FnMut(Value),
163{
164    for_each_ndjson_rev_with_options(
165        engine,
166        path,
167        query,
168        super::ndjson::NdjsonOptions::default(),
169        |value| {
170            f(value);
171            Ok(super::ndjson::NdjsonControl::Continue)
172        },
173    )
174}
175
176pub fn for_each_ndjson_rev_with_options<P, F>(
177    engine: &JetroEngine,
178    path: P,
179    query: &str,
180    options: super::ndjson::NdjsonOptions,
181    mut f: F,
182) -> Result<usize, JetroEngineError>
183where
184    P: AsRef<Path>,
185    F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
186{
187    drive_rev(engine, path, query, options, |value| f(Value::from(value)))
188}
189
190pub fn collect_ndjson_rev_matches<P>(
191    engine: &JetroEngine,
192    path: P,
193    predicate: &str,
194    limit: usize,
195) -> Result<Vec<Value>, JetroEngineError>
196where
197    P: AsRef<Path>,
198{
199    collect_ndjson_rev_matches_with_options(
200        engine,
201        path,
202        predicate,
203        limit,
204        super::ndjson::NdjsonOptions::default(),
205    )
206}
207
208pub fn collect_ndjson_rev_matches_with_options<P>(
209    engine: &JetroEngine,
210    path: P,
211    predicate: &str,
212    limit: usize,
213    options: super::ndjson::NdjsonOptions,
214) -> Result<Vec<Value>, JetroEngineError>
215where
216    P: AsRef<Path>,
217{
218    let mut values = Vec::with_capacity(limit);
219    drive_rev_matches(engine, path, predicate, limit, options, |value| {
220        values.push(Value::from(value));
221        Ok(super::ndjson::NdjsonControl::Continue)
222    })?;
223    Ok(values)
224}
225
226pub fn run_ndjson_rev<P, W>(
227    engine: &JetroEngine,
228    path: P,
229    query: &str,
230    writer: W,
231) -> Result<usize, JetroEngineError>
232where
233    P: AsRef<Path>,
234    W: Write,
235{
236    run_ndjson_rev_with_options(
237        engine,
238        path,
239        query,
240        writer,
241        super::ndjson::NdjsonOptions::default(),
242    )
243}
244
245pub fn run_ndjson_rev_with_options<P, W>(
246    engine: &JetroEngine,
247    path: P,
248    query: &str,
249    writer: W,
250    options: super::ndjson::NdjsonOptions,
251) -> Result<usize, JetroEngineError>
252where
253    P: AsRef<Path>,
254    W: Write,
255{
256    #[cfg(feature = "simd-json")]
257    if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
258        return drive_rev_writer_tape(engine, path, &plan, None, options, writer);
259    }
260
261    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
262    let count = drive_rev(engine, path, query, options, |value| {
263        super::ndjson::write_val_line(&mut writer, &value)?;
264        Ok(super::ndjson::NdjsonControl::Continue)
265    })?;
266    writer.flush()?;
267    Ok(count)
268}
269
270pub fn run_ndjson_rev_limit<P, W>(
271    engine: &JetroEngine,
272    path: P,
273    query: &str,
274    limit: usize,
275    writer: W,
276) -> Result<usize, JetroEngineError>
277where
278    P: AsRef<Path>,
279    W: Write,
280{
281    run_ndjson_rev_limit_with_options(
282        engine,
283        path,
284        query,
285        limit,
286        writer,
287        super::ndjson::NdjsonOptions::default(),
288    )
289}
290
291pub fn run_ndjson_rev_limit_with_options<P, W>(
292    engine: &JetroEngine,
293    path: P,
294    query: &str,
295    limit: usize,
296    writer: W,
297    options: super::ndjson::NdjsonOptions,
298) -> Result<usize, JetroEngineError>
299where
300    P: AsRef<Path>,
301    W: Write,
302{
303    if limit == 0 {
304        return Ok(0);
305    }
306
307    #[cfg(feature = "simd-json")]
308    if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
309        return drive_rev_writer_tape(engine, path, &plan, Some(limit), options, writer);
310    }
311
312    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
313    let mut emitted = 0usize;
314    let count = drive_rev(engine, path, query, options, |value| {
315        super::ndjson::write_val_line(&mut writer, &value)?;
316        emitted += 1;
317        Ok(if emitted >= limit {
318            super::ndjson::NdjsonControl::Stop
319        } else {
320            super::ndjson::NdjsonControl::Continue
321        })
322    })?;
323    writer.flush()?;
324    Ok(count)
325}
326
327pub fn run_ndjson_rev_matches<P, W>(
328    engine: &JetroEngine,
329    path: P,
330    predicate: &str,
331    limit: usize,
332    writer: W,
333) -> Result<usize, JetroEngineError>
334where
335    P: AsRef<Path>,
336    W: Write,
337{
338    run_ndjson_rev_matches_with_options(
339        engine,
340        path,
341        predicate,
342        limit,
343        writer,
344        super::ndjson::NdjsonOptions::default(),
345    )
346}
347
348pub fn run_ndjson_rev_matches_with_options<P, W>(
349    engine: &JetroEngine,
350    path: P,
351    predicate: &str,
352    limit: usize,
353    writer: W,
354    options: super::ndjson::NdjsonOptions,
355) -> Result<usize, JetroEngineError>
356where
357    P: AsRef<Path>,
358    W: Write,
359{
360    drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
361}
362
363#[cfg(feature = "simd-json")]
364fn drive_rev_writer_tape<P, W>(
365    engine: &JetroEngine,
366    path: P,
367    plan: &super::ndjson::NdjsonDirectTapePlan,
368    limit: Option<usize>,
369    options: super::ndjson::NdjsonOptions,
370    writer: W,
371) -> Result<usize, JetroEngineError>
372where
373    P: AsRef<Path>,
374    W: Write,
375{
376    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
377    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
378    let mut scratch =
379        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
380    let mut runner = super::ndjson::NdjsonTapeWriterRunner::new(engine, plan);
381    let mut count = 0usize;
382
383    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
384        scratch.parse_slice(&row).map_err(|message| {
385            super::ndjson::row_parse_error(
386                reverse_row_no,
387                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
388            )
389        })?;
390        runner.write_row(&scratch, &mut writer)?;
391        writer.write_all(b"\n")?;
392        count += 1;
393        if limit.is_some_and(|limit| count >= limit) {
394            break;
395        }
396    }
397
398    writer.flush()?;
399    Ok(count)
400}
401
402fn drive_rev<P, F>(
403    engine: &JetroEngine,
404    path: P,
405    query: &str,
406    options: super::ndjson::NdjsonOptions,
407    mut emit: F,
408) -> Result<usize, JetroEngineError>
409where
410    P: AsRef<Path>,
411    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
412{
413    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
414    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
415    let mut count = 0usize;
416
417    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
418        let out = executor.eval_owned_row(reverse_row_no, row)?;
419        count += 1;
420        if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
421            break;
422        }
423    }
424
425    Ok(count)
426}
427
428fn drive_rev_matches<P, F>(
429    engine: &JetroEngine,
430    path: P,
431    predicate: &str,
432    limit: usize,
433    options: super::ndjson::NdjsonOptions,
434    mut emit: F,
435) -> Result<usize, JetroEngineError>
436where
437    P: AsRef<Path>,
438    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
439{
440    if limit == 0 {
441        return Ok(0);
442    }
443
444    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
445    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
446    let mut emitted = 0usize;
447
448    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
449        let document = executor.parse_owned_row(reverse_row_no, row)?;
450        let matched = executor.eval_document(reverse_row_no, &document)?;
451        if !is_truthy(&matched) {
452            continue;
453        }
454
455        let root = document
456            .root_val_with(executor.engine().keys())
457            .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
458        emitted += 1;
459        if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
460            break;
461        }
462    }
463
464    Ok(emitted)
465}
466
467fn drive_rev_matches_writer<P, W>(
468    engine: &JetroEngine,
469    path: P,
470    predicate: &str,
471    limit: usize,
472    options: super::ndjson::NdjsonOptions,
473    writer: W,
474) -> Result<usize, JetroEngineError>
475where
476    P: AsRef<Path>,
477    W: Write,
478{
479    if limit == 0 {
480        return Ok(0);
481    }
482
483    #[cfg(feature = "simd-json")]
484    if let Some(predicate) = super::ndjson::direct_tape_predicate(engine, predicate) {
485        return drive_rev_matches_writer_tape(engine, path, &predicate, limit, options, writer);
486    }
487
488    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
489    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
490    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
491    let mut emitted = 0usize;
492
493    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
494        let document = executor.parse_owned_row(reverse_row_no, row)?;
495        let matched = executor.eval_document(reverse_row_no, &document)?;
496        if !is_truthy(&matched) {
497            continue;
498        }
499
500        super::ndjson::write_document_line(
501            &mut writer,
502            &document,
503            reverse_row_no,
504            executor.engine(),
505        )?;
506        emitted += 1;
507        if emitted >= limit {
508            break;
509        }
510    }
511
512    writer.flush()?;
513    Ok(emitted)
514}
515
516#[cfg(feature = "simd-json")]
517fn drive_rev_matches_writer_tape<P, W>(
518    engine: &JetroEngine,
519    path: P,
520    predicate: &super::ndjson::NdjsonDirectPredicate,
521    limit: usize,
522    options: super::ndjson::NdjsonOptions,
523    writer: W,
524) -> Result<usize, JetroEngineError>
525where
526    P: AsRef<Path>,
527    W: Write,
528{
529    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
530    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
531    let mut scratch =
532        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
533    let mut emitted = 0usize;
534    let needs_vm = super::ndjson::predicate_needs_vm(predicate);
535    let mut vm = needs_vm.then(|| engine.lock_vm());
536    let env = needs_vm.then(|| crate::data::context::Env::new(crate::Val::Null));
537    let mut predicate_path = super::ndjson::NdjsonPathCache::default();
538
539    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
540        scratch.parse_slice(&row).map_err(|message| {
541            super::ndjson::row_parse_error(
542                reverse_row_no,
543                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
544            )
545        })?;
546        if !super::ndjson::eval_tape_predicate(
547            &scratch,
548            predicate,
549            env.as_ref(),
550            &mut vm,
551            &mut predicate_path,
552        )
553        .map_err(JetroEngineError::Eval)?
554        {
555            continue;
556        }
557        writer.write_all(&row)?;
558        writer.write_all(b"\n")?;
559        emitted += 1;
560        if emitted >= limit {
561            break;
562        }
563    }
564
565    writer.flush()?;
566    Ok(emitted)
567}
568
569fn trim_line_ending(buf: &mut Vec<u8>) {
570    while matches!(buf.last(), Some(b'\n' | b'\r')) {
571        buf.pop();
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::NdjsonReverseFileDriver;
578    use crate::JetroEngine;
579    use std::path::PathBuf;
580
581    #[test]
582    fn reverse_driver_reads_rows_from_tail() {
583        let path = temp_path("jetro-ndjson-rev-basic");
584        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
585        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
586
587        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
588        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
589        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
590        assert!(driver.next_line().unwrap().is_none());
591
592        let _ = std::fs::remove_file(path);
593    }
594
595    #[test]
596    fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
597        let path = temp_path("jetro-ndjson-rev-edge");
598        std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
599        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
600
601        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
602        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
603        assert!(driver.next_line().unwrap().is_none());
604
605        let _ = std::fs::remove_file(path);
606    }
607
608    #[test]
609    fn reverse_driver_reports_reverse_row_numbers() {
610        let path = temp_path("jetro-ndjson-rev-row-no");
611        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
612        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
613
614        assert_eq!(
615            driver.next_line_with_reverse_no().unwrap().unwrap(),
616            (1, br#"{"n":2}"#.to_vec())
617        );
618        assert_eq!(
619            driver.next_line_with_reverse_no().unwrap().unwrap(),
620            (2, br#"{"n":1}"#.to_vec())
621        );
622        assert!(driver.next_line_with_reverse_no().unwrap().is_none());
623
624        let _ = std::fs::remove_file(path);
625    }
626
627    #[test]
628    fn reverse_query_uses_direct_writer_shapes() {
629        let path = temp_path("jetro-ndjson-rev-direct");
630        std::fs::write(
631            &path,
632            b"{\"name\":\"ada\",\"attrs\":[{\"key\":\"a\",\"value\":1}]}\n{\"name\":\"bob\",\"attrs\":[{\"key\":\"b\",\"value\":2}]}\n",
633        )
634        .unwrap();
635        let engine = JetroEngine::new();
636        let mut out = Vec::new();
637
638        super::run_ndjson_rev(&engine, &path, "attrs.map([@.key, @.value])", &mut out).unwrap();
639
640        assert_eq!(
641            String::from_utf8(out).unwrap(),
642            "[[\"b\",2]]\n[[\"a\",1]]\n"
643        );
644        let _ = std::fs::remove_file(path);
645    }
646
647    fn temp_path(name: &str) -> PathBuf {
648        let mut path = std::env::temp_dir();
649        path.push(format!("{}-{}.ndjson", name, std::process::id()));
650        path
651    }
652}