Skip to main content

jetro_core/io/
ndjson_rev.rs

1use super::ndjson_distinct::{
2    distinct_key_bytes, raw_distinct_key_bytes, AdaptiveDistinctKeys, DistinctFrontFilterKind,
3};
4use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
5use super::RowError;
6use crate::util::is_truthy;
7use crate::{JetroEngine, JetroEngineError};
8use memchr::memrchr;
9use serde_json::Value;
10use std::borrow::Cow;
11use std::collections::VecDeque;
12use std::fs::File;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16#[cfg(feature = "simd-json")]
17use super::ndjson_byte::{
18    raw_json_byte_path_value, tape_plan_can_write_byte_row, write_ndjson_byte_tape_plan_row,
19    BytePlanWrite, RawFieldValue,
20};
21
22/// Reverse NDJSON line reader over a seekable file.
23///
24/// The reader scans fixed-size chunks from EOF to BOF and returns owned line
25/// bytes in reverse physical order. It keeps only the current chunk and one
26/// cross-chunk carry buffer, so memory stays bounded by the longest row plus
27/// the configured chunk size.
28pub struct NdjsonReverseFileDriver {
29    file: File,
30    pos: u64,
31    chunk_size: usize,
32    max_line_len: usize,
33    row_frame: NdjsonRowFrame,
34    carry: Vec<u8>,
35    pending: VecDeque<Vec<u8>>,
36    finished_head: bool,
37    reverse_line_no: u64,
38}
39
40impl NdjsonReverseFileDriver {
41    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
42        Self::with_options(path, super::ndjson::NdjsonOptions::default())
43    }
44
45    pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
46        Self::with_options(
47            path,
48            super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
49        )
50    }
51
52    pub fn with_options<P: AsRef<Path>>(
53        path: P,
54        options: super::ndjson::NdjsonOptions,
55    ) -> Result<Self, RowError> {
56        let mut file = File::open(path)?;
57        let pos = file.seek(SeekFrom::End(0))?;
58        Ok(Self {
59            file,
60            pos,
61            chunk_size: options.reverse_chunk_size.max(1),
62            max_line_len: options.max_line_len,
63            row_frame: options.row_frame,
64            carry: Vec::new(),
65            pending: VecDeque::new(),
66            finished_head: false,
67            reverse_line_no: 0,
68        })
69    }
70
71    pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
72        Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
73    }
74
75    pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
76        loop {
77            if let Some(mut line) = self.pending.pop_front() {
78                self.reverse_line_no += 1;
79                if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
80                    return Ok(Some((self.reverse_line_no, line)));
81                }
82                continue;
83            }
84
85            if self.pos == 0 {
86                if self.finished_head || self.carry.is_empty() {
87                    return Ok(None);
88                }
89                self.finished_head = true;
90                let mut line = std::mem::take(&mut self.carry);
91                trim_line_ending(&mut line);
92                self.check_line_len(line.len())?;
93                if line.iter().any(|b| !b.is_ascii_whitespace()) {
94                    self.reverse_line_no += 1;
95                    if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
96                        return Ok(Some((self.reverse_line_no, line)));
97                    }
98                }
99                return Ok(None);
100            }
101
102            let read_len = self.chunk_size.min(self.pos as usize);
103            self.pos -= read_len as u64;
104            let mut chunk = vec![0u8; read_len];
105            self.file.seek(SeekFrom::Start(self.pos))?;
106            self.file.read_exact(&mut chunk)?;
107
108            let mut end = chunk.len();
109            while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
110                let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
111                line.extend_from_slice(&chunk[nl + 1..end]);
112                line.extend_from_slice(&self.carry);
113                self.carry.clear();
114                end = nl;
115                trim_line_ending(&mut line);
116                self.check_line_len(line.len())?;
117                if line.iter().any(|b| !b.is_ascii_whitespace()) {
118                    self.pending.push_back(line);
119                }
120            }
121
122            if end > 0 {
123                let mut next = Vec::with_capacity(end + self.carry.len());
124                next.extend_from_slice(&chunk[..end]);
125                next.extend_from_slice(&self.carry);
126                self.check_line_len(next.len())?;
127                self.carry = next;
128            }
129        }
130    }
131
132    fn frame_line(&self, line_no: u64, line: &mut Vec<u8>) -> Result<Option<Vec<u8>>, RowError> {
133        match frame_payload(self.row_frame, line_no, line)? {
134            FramePayload::Data(range) => {
135                if range.start > 0 || range.end < line.len() {
136                    line.copy_within(range.clone(), 0);
137                    line.truncate(range.end - range.start);
138                }
139                Ok(Some(std::mem::take(line)))
140            }
141            FramePayload::Skip => Ok(None),
142        }
143    }
144
145    fn check_line_len(&self, len: usize) -> Result<(), RowError> {
146        if len > self.max_line_len {
147            return Err(RowError::LineTooLarge {
148                line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
149                len,
150                max: self.max_line_len,
151            });
152        }
153        Ok(())
154    }
155}
156
157pub fn collect_ndjson_rev<P>(
158    engine: &JetroEngine,
159    path: P,
160    query: &str,
161) -> Result<Vec<Value>, JetroEngineError>
162where
163    P: AsRef<Path>,
164{
165    collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
166}
167
168pub fn collect_ndjson_rev_with_options<P>(
169    engine: &JetroEngine,
170    path: P,
171    query: &str,
172    options: super::ndjson::NdjsonOptions,
173) -> Result<Vec<Value>, JetroEngineError>
174where
175    P: AsRef<Path>,
176{
177    let mut values = Vec::new();
178    drive_rev(engine, path, query, options, |value| {
179        values.push(Value::from(value));
180        Ok(super::ndjson::NdjsonControl::Continue)
181    })?;
182    Ok(values)
183}
184
185pub fn for_each_ndjson_rev<P, F>(
186    engine: &JetroEngine,
187    path: P,
188    query: &str,
189    mut f: F,
190) -> Result<usize, JetroEngineError>
191where
192    P: AsRef<Path>,
193    F: FnMut(Value),
194{
195    for_each_ndjson_rev_with_options(
196        engine,
197        path,
198        query,
199        super::ndjson::NdjsonOptions::default(),
200        |value| {
201            f(value);
202            Ok(super::ndjson::NdjsonControl::Continue)
203        },
204    )
205}
206
207pub fn for_each_ndjson_rev_with_options<P, F>(
208    engine: &JetroEngine,
209    path: P,
210    query: &str,
211    options: super::ndjson::NdjsonOptions,
212    mut f: F,
213) -> Result<usize, JetroEngineError>
214where
215    P: AsRef<Path>,
216    F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
217{
218    drive_rev(engine, path, query, options, |value| f(Value::from(value)))
219}
220
221pub fn collect_ndjson_rev_matches<P>(
222    engine: &JetroEngine,
223    path: P,
224    predicate: &str,
225    limit: usize,
226) -> Result<Vec<Value>, JetroEngineError>
227where
228    P: AsRef<Path>,
229{
230    collect_ndjson_rev_matches_with_options(
231        engine,
232        path,
233        predicate,
234        limit,
235        super::ndjson::NdjsonOptions::default(),
236    )
237}
238
239pub fn collect_ndjson_rev_matches_with_options<P>(
240    engine: &JetroEngine,
241    path: P,
242    predicate: &str,
243    limit: usize,
244    options: super::ndjson::NdjsonOptions,
245) -> Result<Vec<Value>, JetroEngineError>
246where
247    P: AsRef<Path>,
248{
249    let mut values = Vec::with_capacity(limit);
250    drive_rev_matches(engine, path, predicate, limit, options, |value| {
251        values.push(Value::from(value));
252        Ok(super::ndjson::NdjsonControl::Continue)
253    })?;
254    Ok(values)
255}
256
257pub fn run_ndjson_rev<P, W>(
258    engine: &JetroEngine,
259    path: P,
260    query: &str,
261    writer: W,
262) -> Result<usize, JetroEngineError>
263where
264    P: AsRef<Path>,
265    W: Write,
266{
267    run_ndjson_rev_with_options(
268        engine,
269        path,
270        query,
271        writer,
272        super::ndjson::NdjsonOptions::default(),
273    )
274}
275
276pub fn run_ndjson_rev_with_options<P, W>(
277    engine: &JetroEngine,
278    path: P,
279    query: &str,
280    writer: W,
281    options: super::ndjson::NdjsonOptions,
282) -> Result<usize, JetroEngineError>
283where
284    P: AsRef<Path>,
285    W: Write,
286{
287    #[cfg(feature = "simd-json")]
288    if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
289        return drive_rev_writer_tape(engine, path, &plan, None, options, writer);
290    }
291
292    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
293    let mut emitted = 0usize;
294    drive_rev(engine, path, query, options, |value| {
295        if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
296            emitted += 1;
297        }
298        Ok(super::ndjson::NdjsonControl::Continue)
299    })?;
300    writer.flush()?;
301    Ok(emitted)
302}
303
304pub fn run_ndjson_rev_limit<P, W>(
305    engine: &JetroEngine,
306    path: P,
307    query: &str,
308    limit: usize,
309    writer: W,
310) -> Result<usize, JetroEngineError>
311where
312    P: AsRef<Path>,
313    W: Write,
314{
315    run_ndjson_rev_limit_with_options(
316        engine,
317        path,
318        query,
319        limit,
320        writer,
321        super::ndjson::NdjsonOptions::default(),
322    )
323}
324
325pub fn run_ndjson_rev_limit_with_options<P, W>(
326    engine: &JetroEngine,
327    path: P,
328    query: &str,
329    limit: usize,
330    writer: W,
331    options: super::ndjson::NdjsonOptions,
332) -> Result<usize, JetroEngineError>
333where
334    P: AsRef<Path>,
335    W: Write,
336{
337    if limit == 0 {
338        return Ok(0);
339    }
340
341    #[cfg(feature = "simd-json")]
342    if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
343        return drive_rev_writer_tape(engine, path, &plan, Some(limit), options, writer);
344    }
345
346    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
347    let mut emitted = 0usize;
348    drive_rev(engine, path, query, options, |value| {
349        let wrote = super::ndjson::write_val_line_with_options(&mut writer, &value, options)?;
350        if wrote {
351            emitted += 1;
352        }
353        Ok(if wrote && emitted >= limit {
354            super::ndjson::NdjsonControl::Stop
355        } else {
356            super::ndjson::NdjsonControl::Continue
357        })
358    })?;
359    writer.flush()?;
360    Ok(emitted)
361}
362
363pub fn run_ndjson_rev_distinct_by<P, W>(
364    engine: &JetroEngine,
365    path: P,
366    key_query: &str,
367    query: &str,
368    limit: usize,
369    writer: W,
370) -> Result<usize, JetroEngineError>
371where
372    P: AsRef<Path>,
373    W: Write,
374{
375    run_ndjson_rev_distinct_by_with_options(
376        engine,
377        path,
378        key_query,
379        query,
380        limit,
381        writer,
382        super::ndjson::NdjsonOptions::default(),
383    )
384}
385
386pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
387    engine: &JetroEngine,
388    path: P,
389    key_query: &str,
390    query: &str,
391    limit: usize,
392    writer: W,
393    options: super::ndjson::NdjsonOptions,
394) -> Result<usize, JetroEngineError>
395where
396    P: AsRef<Path>,
397    W: Write,
398{
399    run_ndjson_rev_distinct_by_with_stats_and_options(
400        engine, path, key_query, query, limit, writer, options,
401    )
402    .map(|stats| stats.emitted)
403}
404
405pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
406    engine: &JetroEngine,
407    path: P,
408    key_query: &str,
409    query: &str,
410    limit: usize,
411    writer: W,
412) -> Result<NdjsonRevDistinctStats, JetroEngineError>
413where
414    P: AsRef<Path>,
415    W: Write,
416{
417    run_ndjson_rev_distinct_by_with_stats_and_options(
418        engine,
419        path,
420        key_query,
421        query,
422        limit,
423        writer,
424        super::ndjson::NdjsonOptions::default(),
425    )
426}
427
428pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
429    engine: &JetroEngine,
430    path: P,
431    key_query: &str,
432    query: &str,
433    limit: usize,
434    writer: W,
435    options: super::ndjson::NdjsonOptions,
436) -> Result<NdjsonRevDistinctStats, JetroEngineError>
437where
438    P: AsRef<Path>,
439    W: Write,
440{
441    if limit == 0 {
442        return Ok(NdjsonRevDistinctStats::default());
443    }
444
445    #[cfg(feature = "simd-json")]
446    let direct_key_plan = super::ndjson::direct_tape_plan(engine, key_query);
447    #[cfg(feature = "simd-json")]
448    let direct_value_plan = super::ndjson::direct_tape_plan(engine, query)
449        .filter(|plan| tape_plan_can_write_byte_row(plan));
450
451    let mut key_plan = None;
452    let mut value_plan = None;
453    let mut vm = None;
454    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
455    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
456    #[cfg(feature = "simd-json")]
457    let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
458    #[cfg(feature = "simd-json")]
459    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
460    let mut seen = AdaptiveDistinctKeys::default();
461    let mut stats = NdjsonRevDistinctStats::default();
462
463    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
464        stats.rows_scanned += 1;
465        let mut row = Some(row);
466        let mut document = None;
467
468        #[cfg(feature = "simd-json")]
469        let direct_key = direct_key_plan.as_ref().and_then(|plan| {
470            row.as_deref()
471                .and_then(|row| distinct_key_direct(row, plan))
472        });
473        #[cfg(not(feature = "simd-json"))]
474        let direct_key = None;
475
476        let inserted = if let Some(key) = direct_key {
477            stats.direct_key_rows += 1;
478            match key {
479                Cow::Borrowed(key) => seen.insert_slice(key),
480                Cow::Owned(key) => seen.insert(key),
481            }
482        } else {
483            stats.fallback_key_rows += 1;
484            let parsed = super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?;
485            let plan = key_plan.get_or_insert_with(|| {
486                engine.cached_plan(key_query, crate::plan::physical::PlanningContext::bytes())
487            });
488            let vm = vm.get_or_insert_with(|| engine.lock_vm());
489            let key = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
490                .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
491            let key = distinct_key_bytes(&key)?;
492            document = Some(parsed);
493            seen.insert(key)
494        };
495        if !inserted {
496            stats.duplicate_rows += 1;
497            continue;
498        }
499
500        #[cfg(feature = "simd-json")]
501        if let (Some(plan), Some(row)) = (direct_value_plan.as_ref(), row.as_deref()) {
502            byte_scratch.clear();
503            out.clear();
504            match write_ndjson_byte_tape_plan_row(&mut out, row, plan, &mut byte_scratch)? {
505                BytePlanWrite::Done => {
506                    if super::ndjson::write_json_bytes_line_with_options(
507                        &mut writer,
508                        &out,
509                        options,
510                    )? {
511                        stats.direct_value_rows += 1;
512                        stats.emitted += 1;
513                    }
514                    if stats.emitted >= limit {
515                        break;
516                    }
517                    continue;
518                }
519                BytePlanWrite::Fallback => {}
520            }
521        }
522
523        let parsed = match document {
524            Some(document) => document,
525            None => super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?,
526        };
527        let plan = value_plan.get_or_insert_with(|| {
528            engine.cached_plan(query, crate::plan::physical::PlanningContext::bytes())
529        });
530        let vm = vm.get_or_insert_with(|| engine.lock_vm());
531        let value = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
532            .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
533        if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
534            stats.fallback_value_rows += 1;
535            stats.emitted += 1;
536        }
537        if stats.emitted >= limit {
538            break;
539        }
540    }
541
542    writer.flush()?;
543    stats.front_filter = seen.front_kind();
544    Ok(stats)
545}
546
547#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
548pub struct NdjsonRevDistinctStats {
549    pub rows_scanned: usize,
550    pub emitted: usize,
551    pub duplicate_rows: usize,
552    pub direct_key_rows: usize,
553    pub fallback_key_rows: usize,
554    pub direct_value_rows: usize,
555    pub fallback_value_rows: usize,
556    pub front_filter: DistinctFrontFilterKind,
557}
558
559#[cfg(feature = "simd-json")]
560fn distinct_key_direct<'a>(
561    row: &'a [u8],
562    plan: &super::ndjson::NdjsonDirectTapePlan,
563) -> Option<Cow<'a, [u8]>> {
564    const NULL_KEY: &[u8] = b"null";
565
566    let super::ndjson::NdjsonDirectTapePlan::RootPath(steps) = plan else {
567        return None;
568    };
569    match raw_json_byte_path_value(row, steps) {
570        RawFieldValue::Found(value) => raw_distinct_key_bytes(value),
571        RawFieldValue::Missing => Some(Cow::Borrowed(NULL_KEY)),
572        RawFieldValue::Fallback => None,
573    }
574}
575
576pub fn run_ndjson_rev_matches<P, W>(
577    engine: &JetroEngine,
578    path: P,
579    predicate: &str,
580    limit: usize,
581    writer: W,
582) -> Result<usize, JetroEngineError>
583where
584    P: AsRef<Path>,
585    W: Write,
586{
587    run_ndjson_rev_matches_with_options(
588        engine,
589        path,
590        predicate,
591        limit,
592        writer,
593        super::ndjson::NdjsonOptions::default(),
594    )
595}
596
597pub fn run_ndjson_rev_matches_with_options<P, W>(
598    engine: &JetroEngine,
599    path: P,
600    predicate: &str,
601    limit: usize,
602    writer: W,
603    options: super::ndjson::NdjsonOptions,
604) -> Result<usize, JetroEngineError>
605where
606    P: AsRef<Path>,
607    W: Write,
608{
609    drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
610}
611
612#[cfg(feature = "simd-json")]
613fn drive_rev_writer_tape<P, W>(
614    engine: &JetroEngine,
615    path: P,
616    plan: &super::ndjson::NdjsonDirectTapePlan,
617    limit: Option<usize>,
618    options: super::ndjson::NdjsonOptions,
619    writer: W,
620) -> Result<usize, JetroEngineError>
621where
622    P: AsRef<Path>,
623    W: Write,
624{
625    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
626    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
627    let mut scratch =
628        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
629    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
630    let mut runner = super::ndjson::NdjsonTapeWriterRunner::new(engine, plan);
631    let mut count = 0usize;
632
633    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
634        out.clear();
635        scratch.parse_slice(&row).map_err(|message| {
636            super::ndjson::row_parse_error(
637                reverse_row_no,
638                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
639            )
640        })?;
641        runner.write_row(&scratch, &mut out)?;
642        if super::ndjson::write_json_bytes_line_with_options(&mut writer, &out, options)? {
643            count += 1;
644        }
645        if limit.is_some_and(|limit| count >= limit) {
646            break;
647        }
648    }
649
650    writer.flush()?;
651    Ok(count)
652}
653
654fn drive_rev<P, F>(
655    engine: &JetroEngine,
656    path: P,
657    query: &str,
658    options: super::ndjson::NdjsonOptions,
659    mut emit: F,
660) -> Result<usize, JetroEngineError>
661where
662    P: AsRef<Path>,
663    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
664{
665    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
666    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
667    let mut count = 0usize;
668
669    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
670        let out = executor.eval_owned_row(reverse_row_no, row)?;
671        count += 1;
672        if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
673            break;
674        }
675    }
676
677    Ok(count)
678}
679
680fn drive_rev_matches<P, F>(
681    engine: &JetroEngine,
682    path: P,
683    predicate: &str,
684    limit: usize,
685    options: super::ndjson::NdjsonOptions,
686    mut emit: F,
687) -> Result<usize, JetroEngineError>
688where
689    P: AsRef<Path>,
690    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
691{
692    if limit == 0 {
693        return Ok(0);
694    }
695
696    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
697    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
698    let mut emitted = 0usize;
699
700    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
701        let document = executor.parse_owned_row(reverse_row_no, row)?;
702        let matched = executor.eval_document(reverse_row_no, &document)?;
703        if !is_truthy(&matched) {
704            continue;
705        }
706
707        let root = document
708            .root_val_with(executor.engine().keys())
709            .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
710        emitted += 1;
711        if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
712            break;
713        }
714    }
715
716    Ok(emitted)
717}
718
719fn drive_rev_matches_writer<P, W>(
720    engine: &JetroEngine,
721    path: P,
722    predicate: &str,
723    limit: usize,
724    options: super::ndjson::NdjsonOptions,
725    writer: W,
726) -> Result<usize, JetroEngineError>
727where
728    P: AsRef<Path>,
729    W: Write,
730{
731    if limit == 0 {
732        return Ok(0);
733    }
734
735    #[cfg(feature = "simd-json")]
736    if let Some(predicate) = super::ndjson::direct_tape_predicate(engine, predicate) {
737        return drive_rev_matches_writer_tape(engine, path, &predicate, limit, options, writer);
738    }
739
740    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
741    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
742    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
743    let mut emitted = 0usize;
744
745    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
746        let document = executor.parse_owned_row(reverse_row_no, row)?;
747        let matched = executor.eval_document(reverse_row_no, &document)?;
748        if !is_truthy(&matched) {
749            continue;
750        }
751
752        super::ndjson::write_document_line(
753            &mut writer,
754            &document,
755            reverse_row_no,
756            executor.engine(),
757        )?;
758        emitted += 1;
759        if emitted >= limit {
760            break;
761        }
762    }
763
764    writer.flush()?;
765    Ok(emitted)
766}
767
768#[cfg(feature = "simd-json")]
769fn drive_rev_matches_writer_tape<P, W>(
770    engine: &JetroEngine,
771    path: P,
772    predicate: &super::ndjson::NdjsonDirectPredicate,
773    limit: usize,
774    options: super::ndjson::NdjsonOptions,
775    writer: W,
776) -> Result<usize, JetroEngineError>
777where
778    P: AsRef<Path>,
779    W: Write,
780{
781    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
782    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
783    let mut scratch =
784        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
785    let mut emitted = 0usize;
786    let needs_vm = super::ndjson::predicate_needs_vm(predicate);
787    let mut vm = needs_vm.then(|| engine.lock_vm());
788    let env = needs_vm.then(|| crate::data::context::Env::new(crate::Val::Null));
789    let mut predicate_path = super::ndjson::NdjsonPathCache::default();
790
791    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
792        scratch.parse_slice(&row).map_err(|message| {
793            super::ndjson::row_parse_error(
794                reverse_row_no,
795                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
796            )
797        })?;
798        if !super::ndjson::eval_tape_predicate(
799            &scratch,
800            predicate,
801            env.as_ref(),
802            &mut vm,
803            &mut predicate_path,
804        )
805        .map_err(JetroEngineError::Eval)?
806        {
807            continue;
808        }
809        writer.write_all(&row)?;
810        writer.write_all(b"\n")?;
811        emitted += 1;
812        if emitted >= limit {
813            break;
814        }
815    }
816
817    writer.flush()?;
818    Ok(emitted)
819}
820
821fn trim_line_ending(buf: &mut Vec<u8>) {
822    while matches!(buf.last(), Some(b'\n' | b'\r')) {
823        buf.pop();
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::NdjsonReverseFileDriver;
830    use crate::JetroEngine;
831    use std::path::PathBuf;
832
833    #[test]
834    fn reverse_driver_reads_rows_from_tail() {
835        let path = temp_path("jetro-ndjson-rev-basic");
836        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
837        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
838
839        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
840        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
841        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
842        assert!(driver.next_line().unwrap().is_none());
843
844        let _ = std::fs::remove_file(path);
845    }
846
847    #[test]
848    fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
849        let path = temp_path("jetro-ndjson-rev-edge");
850        std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
851        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
852
853        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
854        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
855        assert!(driver.next_line().unwrap().is_none());
856
857        let _ = std::fs::remove_file(path);
858    }
859
860    #[test]
861    fn reverse_driver_reports_reverse_row_numbers() {
862        let path = temp_path("jetro-ndjson-rev-row-no");
863        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
864        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
865
866        assert_eq!(
867            driver.next_line_with_reverse_no().unwrap().unwrap(),
868            (1, br#"{"n":2}"#.to_vec())
869        );
870        assert_eq!(
871            driver.next_line_with_reverse_no().unwrap().unwrap(),
872            (2, br#"{"n":1}"#.to_vec())
873        );
874        assert!(driver.next_line_with_reverse_no().unwrap().is_none());
875
876        let _ = std::fs::remove_file(path);
877    }
878
879    #[test]
880    fn reverse_query_uses_direct_writer_shapes() {
881        let path = temp_path("jetro-ndjson-rev-direct");
882        std::fs::write(
883            &path,
884            b"{\"name\":\"ada\",\"attrs\":[{\"key\":\"a\",\"value\":1}]}\n{\"name\":\"bob\",\"attrs\":[{\"key\":\"b\",\"value\":2}]}\n",
885        )
886        .unwrap();
887        let engine = JetroEngine::new();
888        let mut out = Vec::new();
889
890        super::run_ndjson_rev(&engine, &path, "attrs.map([@.key, @.value])", &mut out).unwrap();
891
892        assert_eq!(
893            String::from_utf8(out).unwrap(),
894            "[[\"b\",2]]\n[[\"a\",1]]\n"
895        );
896        let _ = std::fs::remove_file(path);
897    }
898
899    #[cfg(feature = "simd-json")]
900    #[test]
901    fn direct_distinct_key_classifier_rejects_escaped_strings() {
902        assert_eq!(
903            super::raw_distinct_key_bytes(br#""plain""#).as_deref(),
904            Some(br#""plain""#.as_slice())
905        );
906        assert_eq!(
907            super::raw_distinct_key_bytes(br#""a\u0062""#).as_deref(),
908            Some(br#""ab""#.as_slice())
909        );
910        assert_eq!(
911            super::raw_distinct_key_bytes(br#"{"k":"v"}"#).as_deref(),
912            Some(br#"{"k":"v"}"#.as_slice())
913        );
914        assert_eq!(
915            super::raw_distinct_key_bytes(b"123").as_deref(),
916            Some(b"123".as_slice())
917        );
918        assert_eq!(
919            super::raw_distinct_key_bytes(br#"{"a" : 1,"b":"x\u0079"}"#).as_deref(),
920            Some(br#"{"a":1,"b":"xy"}"#.as_slice())
921        );
922        assert_eq!(super::raw_distinct_key_bytes(b"1.0"), None);
923    }
924
925    fn temp_path(name: &str) -> PathBuf {
926        let mut path = std::env::temp_dir();
927        path.push(format!("{}-{}.ndjson", name, std::process::id()));
928        path
929    }
930}