Skip to main content

jetro_core/io/
ndjson_rev.rs

1use super::RowError;
2use crate::util::is_truthy;
3use crate::{JetroEngine, JetroEngineError};
4use memchr::memrchr;
5use serde_json::Value;
6use std::collections::VecDeque;
7use std::fs::File;
8use std::io::{Read, Seek, SeekFrom, Write};
9use std::path::Path;
10
11/// Reverse NDJSON line reader over a seekable file.
12///
13/// The reader scans fixed-size chunks from EOF to BOF and returns owned line
14/// bytes in reverse physical order. It keeps only the current chunk and one
15/// cross-chunk carry buffer, so memory stays bounded by the longest row plus
16/// the configured chunk size.
17pub struct NdjsonReverseFileDriver {
18    file: File,
19    pos: u64,
20    chunk_size: usize,
21    max_line_len: usize,
22    carry: Vec<u8>,
23    pending: VecDeque<Vec<u8>>,
24    finished_head: bool,
25    reverse_line_no: u64,
26}
27
28impl NdjsonReverseFileDriver {
29    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
30        Self::with_options(path, super::ndjson::NdjsonOptions::default())
31    }
32
33    pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
34        Self::with_options(
35            path,
36            super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
37        )
38    }
39
40    pub fn with_options<P: AsRef<Path>>(
41        path: P,
42        options: super::ndjson::NdjsonOptions,
43    ) -> Result<Self, RowError> {
44        let mut file = File::open(path)?;
45        let pos = file.seek(SeekFrom::End(0))?;
46        Ok(Self {
47            file,
48            pos,
49            chunk_size: options.reverse_chunk_size.max(1),
50            max_line_len: options.max_line_len,
51            carry: Vec::new(),
52            pending: VecDeque::new(),
53            finished_head: false,
54            reverse_line_no: 0,
55        })
56    }
57
58    pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
59        Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
60    }
61
62    pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
63        loop {
64            if let Some(line) = self.pending.pop_front() {
65                self.reverse_line_no += 1;
66                return Ok(Some((self.reverse_line_no, line)));
67            }
68
69            if self.pos == 0 {
70                if self.finished_head || self.carry.is_empty() {
71                    return Ok(None);
72                }
73                self.finished_head = true;
74                let mut line = std::mem::take(&mut self.carry);
75                trim_line_ending(&mut line);
76                self.check_line_len(line.len())?;
77                if line.iter().any(|b| !b.is_ascii_whitespace()) {
78                    self.reverse_line_no += 1;
79                    return Ok(Some((self.reverse_line_no, line)));
80                }
81                return Ok(None);
82            }
83
84            let read_len = self.chunk_size.min(self.pos as usize);
85            self.pos -= read_len as u64;
86            let mut chunk = vec![0u8; read_len];
87            self.file.seek(SeekFrom::Start(self.pos))?;
88            self.file.read_exact(&mut chunk)?;
89
90            let mut end = chunk.len();
91            while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
92                let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
93                line.extend_from_slice(&chunk[nl + 1..end]);
94                line.extend_from_slice(&self.carry);
95                self.carry.clear();
96                end = nl;
97                trim_line_ending(&mut line);
98                self.check_line_len(line.len())?;
99                if line.iter().any(|b| !b.is_ascii_whitespace()) {
100                    self.pending.push_back(line);
101                }
102            }
103
104            if end > 0 {
105                let mut next = Vec::with_capacity(end + self.carry.len());
106                next.extend_from_slice(&chunk[..end]);
107                next.extend_from_slice(&self.carry);
108                self.check_line_len(next.len())?;
109                self.carry = next;
110            }
111        }
112    }
113
114    fn check_line_len(&self, len: usize) -> Result<(), RowError> {
115        if len > self.max_line_len {
116            return Err(RowError::LineTooLarge {
117                line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
118                len,
119                max: self.max_line_len,
120            });
121        }
122        Ok(())
123    }
124}
125
126pub fn collect_ndjson_rev<P>(
127    engine: &JetroEngine,
128    path: P,
129    query: &str,
130) -> Result<Vec<Value>, JetroEngineError>
131where
132    P: AsRef<Path>,
133{
134    collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
135}
136
137pub fn collect_ndjson_rev_with_options<P>(
138    engine: &JetroEngine,
139    path: P,
140    query: &str,
141    options: super::ndjson::NdjsonOptions,
142) -> Result<Vec<Value>, JetroEngineError>
143where
144    P: AsRef<Path>,
145{
146    let mut values = Vec::new();
147    drive_rev(engine, path, query, options, |value| {
148        values.push(Value::from(value));
149        Ok(super::ndjson::NdjsonControl::Continue)
150    })?;
151    Ok(values)
152}
153
154pub fn for_each_ndjson_rev<P, F>(
155    engine: &JetroEngine,
156    path: P,
157    query: &str,
158    mut f: F,
159) -> Result<usize, JetroEngineError>
160where
161    P: AsRef<Path>,
162    F: FnMut(Value),
163{
164    for_each_ndjson_rev_with_options(
165        engine,
166        path,
167        query,
168        super::ndjson::NdjsonOptions::default(),
169        |value| {
170            f(value);
171            Ok(super::ndjson::NdjsonControl::Continue)
172        },
173    )
174}
175
176pub fn for_each_ndjson_rev_with_options<P, F>(
177    engine: &JetroEngine,
178    path: P,
179    query: &str,
180    options: super::ndjson::NdjsonOptions,
181    mut f: F,
182) -> Result<usize, JetroEngineError>
183where
184    P: AsRef<Path>,
185    F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
186{
187    drive_rev(engine, path, query, options, |value| f(Value::from(value)))
188}
189
190pub fn collect_ndjson_rev_matches<P>(
191    engine: &JetroEngine,
192    path: P,
193    predicate: &str,
194    limit: usize,
195) -> Result<Vec<Value>, JetroEngineError>
196where
197    P: AsRef<Path>,
198{
199    collect_ndjson_rev_matches_with_options(
200        engine,
201        path,
202        predicate,
203        limit,
204        super::ndjson::NdjsonOptions::default(),
205    )
206}
207
208pub fn collect_ndjson_rev_matches_with_options<P>(
209    engine: &JetroEngine,
210    path: P,
211    predicate: &str,
212    limit: usize,
213    options: super::ndjson::NdjsonOptions,
214) -> Result<Vec<Value>, JetroEngineError>
215where
216    P: AsRef<Path>,
217{
218    let mut values = Vec::with_capacity(limit);
219    drive_rev_matches(engine, path, predicate, limit, options, |value| {
220        values.push(Value::from(value));
221        Ok(super::ndjson::NdjsonControl::Continue)
222    })?;
223    Ok(values)
224}
225
226pub fn run_ndjson_rev<P, W>(
227    engine: &JetroEngine,
228    path: P,
229    query: &str,
230    writer: W,
231) -> Result<usize, JetroEngineError>
232where
233    P: AsRef<Path>,
234    W: Write,
235{
236    run_ndjson_rev_with_options(
237        engine,
238        path,
239        query,
240        writer,
241        super::ndjson::NdjsonOptions::default(),
242    )
243}
244
245pub fn run_ndjson_rev_with_options<P, W>(
246    engine: &JetroEngine,
247    path: P,
248    query: &str,
249    writer: W,
250    options: super::ndjson::NdjsonOptions,
251) -> Result<usize, JetroEngineError>
252where
253    P: AsRef<Path>,
254    W: Write,
255{
256    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
257    let count = drive_rev(engine, path, query, options, |value| {
258        super::ndjson::write_val_line(&mut writer, &value)?;
259        Ok(super::ndjson::NdjsonControl::Continue)
260    })?;
261    writer.flush()?;
262    Ok(count)
263}
264
265pub fn run_ndjson_rev_limit<P, W>(
266    engine: &JetroEngine,
267    path: P,
268    query: &str,
269    limit: usize,
270    writer: W,
271) -> Result<usize, JetroEngineError>
272where
273    P: AsRef<Path>,
274    W: Write,
275{
276    run_ndjson_rev_limit_with_options(
277        engine,
278        path,
279        query,
280        limit,
281        writer,
282        super::ndjson::NdjsonOptions::default(),
283    )
284}
285
286pub fn run_ndjson_rev_limit_with_options<P, W>(
287    engine: &JetroEngine,
288    path: P,
289    query: &str,
290    limit: usize,
291    writer: W,
292    options: super::ndjson::NdjsonOptions,
293) -> Result<usize, JetroEngineError>
294where
295    P: AsRef<Path>,
296    W: Write,
297{
298    if limit == 0 {
299        return Ok(0);
300    }
301
302    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
303    let mut emitted = 0usize;
304    let count = drive_rev(engine, path, query, options, |value| {
305        super::ndjson::write_val_line(&mut writer, &value)?;
306        emitted += 1;
307        Ok(if emitted >= limit {
308            super::ndjson::NdjsonControl::Stop
309        } else {
310            super::ndjson::NdjsonControl::Continue
311        })
312    })?;
313    writer.flush()?;
314    Ok(count)
315}
316
317pub fn run_ndjson_rev_matches<P, W>(
318    engine: &JetroEngine,
319    path: P,
320    predicate: &str,
321    limit: usize,
322    writer: W,
323) -> Result<usize, JetroEngineError>
324where
325    P: AsRef<Path>,
326    W: Write,
327{
328    run_ndjson_rev_matches_with_options(
329        engine,
330        path,
331        predicate,
332        limit,
333        writer,
334        super::ndjson::NdjsonOptions::default(),
335    )
336}
337
338pub fn run_ndjson_rev_matches_with_options<P, W>(
339    engine: &JetroEngine,
340    path: P,
341    predicate: &str,
342    limit: usize,
343    writer: W,
344    options: super::ndjson::NdjsonOptions,
345) -> Result<usize, JetroEngineError>
346where
347    P: AsRef<Path>,
348    W: Write,
349{
350    drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
351}
352
353fn drive_rev<P, F>(
354    engine: &JetroEngine,
355    path: P,
356    query: &str,
357    options: super::ndjson::NdjsonOptions,
358    mut emit: F,
359) -> Result<usize, JetroEngineError>
360where
361    P: AsRef<Path>,
362    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
363{
364    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
365    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
366    let mut count = 0usize;
367
368    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
369        let out = executor.eval_owned_row(reverse_row_no, row)?;
370        count += 1;
371        if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
372            break;
373        }
374    }
375
376    Ok(count)
377}
378
379fn drive_rev_matches<P, F>(
380    engine: &JetroEngine,
381    path: P,
382    predicate: &str,
383    limit: usize,
384    options: super::ndjson::NdjsonOptions,
385    mut emit: F,
386) -> Result<usize, JetroEngineError>
387where
388    P: AsRef<Path>,
389    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
390{
391    if limit == 0 {
392        return Ok(0);
393    }
394
395    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
396    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
397    let mut emitted = 0usize;
398
399    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
400        let document = executor.parse_owned_row(reverse_row_no, row)?;
401        let matched = executor.eval_document(reverse_row_no, &document)?;
402        if !is_truthy(&matched) {
403            continue;
404        }
405
406        let root = document
407            .root_val_with(executor.engine().keys())
408            .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
409        emitted += 1;
410        if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
411            break;
412        }
413    }
414
415    Ok(emitted)
416}
417
418fn drive_rev_matches_writer<P, W>(
419    engine: &JetroEngine,
420    path: P,
421    predicate: &str,
422    limit: usize,
423    options: super::ndjson::NdjsonOptions,
424    writer: W,
425) -> Result<usize, JetroEngineError>
426where
427    P: AsRef<Path>,
428    W: Write,
429{
430    if limit == 0 {
431        return Ok(0);
432    }
433
434    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
435    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
436    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
437    let mut emitted = 0usize;
438
439    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
440        let document = executor.parse_owned_row(reverse_row_no, row)?;
441        let matched = executor.eval_document(reverse_row_no, &document)?;
442        if !is_truthy(&matched) {
443            continue;
444        }
445
446        super::ndjson::write_document_line(
447            &mut writer,
448            &document,
449            reverse_row_no,
450            executor.engine(),
451        )?;
452        emitted += 1;
453        if emitted >= limit {
454            break;
455        }
456    }
457
458    writer.flush()?;
459    Ok(emitted)
460}
461
462fn trim_line_ending(buf: &mut Vec<u8>) {
463    while matches!(buf.last(), Some(b'\n' | b'\r')) {
464        buf.pop();
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::NdjsonReverseFileDriver;
471    use std::path::PathBuf;
472
473    #[test]
474    fn reverse_driver_reads_rows_from_tail() {
475        let path = temp_path("jetro-ndjson-rev-basic");
476        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
477        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
478
479        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
480        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
481        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
482        assert!(driver.next_line().unwrap().is_none());
483
484        let _ = std::fs::remove_file(path);
485    }
486
487    #[test]
488    fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
489        let path = temp_path("jetro-ndjson-rev-edge");
490        std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
491        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
492
493        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
494        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
495        assert!(driver.next_line().unwrap().is_none());
496
497        let _ = std::fs::remove_file(path);
498    }
499
500    #[test]
501    fn reverse_driver_reports_reverse_row_numbers() {
502        let path = temp_path("jetro-ndjson-rev-row-no");
503        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
504        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
505
506        assert_eq!(
507            driver.next_line_with_reverse_no().unwrap().unwrap(),
508            (1, br#"{"n":2}"#.to_vec())
509        );
510        assert_eq!(
511            driver.next_line_with_reverse_no().unwrap().unwrap(),
512            (2, br#"{"n":1}"#.to_vec())
513        );
514        assert!(driver.next_line_with_reverse_no().unwrap().is_none());
515
516        let _ = std::fs::remove_file(path);
517    }
518
519    fn temp_path(name: &str) -> PathBuf {
520        let mut path = std::env::temp_dir();
521        path.push(format!("{}-{}.ndjson", name, std::process::id()));
522        path
523    }
524}