Skip to main content

jetro_core/io/
ndjson_rev.rs

1use super::ndjson_byte::{
2    raw_json_byte_path_value, tape_plan_can_write_byte_row, write_ndjson_byte_tape_plan_row,
3    BytePlanWrite, RawFieldValue,
4};
5use super::ndjson_distinct::{
6    distinct_key_bytes, raw_distinct_key_bytes, AdaptiveDistinctKeys, DistinctFrontFilterKind,
7};
8use super::ndjson_frame::{frame_payload, FramePayload, NdjsonRowFrame};
9use super::ndjson_route::{
10    NdjsonExecutionReport, NdjsonExecutionStats, NdjsonRouteExplain, NdjsonRouteKind,
11    NdjsonSourceCaps,
12};
13use super::RowError;
14use crate::util::is_truthy;
15use crate::{JetroEngine, JetroEngineError};
16use memchr::memrchr;
17use serde_json::Value;
18use std::borrow::Cow;
19use std::collections::VecDeque;
20use std::fs::File;
21use std::io::{Read, Seek, SeekFrom, Write};
22use std::path::Path;
23
24/// Reverse NDJSON line reader over a seekable file.
25///
26/// The reader scans fixed-size chunks from EOF to BOF and returns owned line
27/// bytes in reverse physical order. It keeps only the current chunk and one
28/// cross-chunk carry buffer, so memory stays bounded by the longest row plus
29/// the configured chunk size.
30pub struct NdjsonReverseFileDriver {
31    file: File,
32    pos: u64,
33    chunk_size: usize,
34    max_line_len: usize,
35    row_frame: NdjsonRowFrame,
36    carry: Vec<u8>,
37    pending: VecDeque<Vec<u8>>,
38    finished_head: bool,
39    reverse_line_no: u64,
40}
41
42impl NdjsonReverseFileDriver {
43    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
44        Self::with_options(path, super::ndjson::NdjsonOptions::default())
45    }
46
47    pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
48        Self::with_options(
49            path,
50            super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
51        )
52    }
53
54    pub fn with_options<P: AsRef<Path>>(
55        path: P,
56        options: super::ndjson::NdjsonOptions,
57    ) -> Result<Self, RowError> {
58        let mut file = File::open(path)?;
59        let pos = file.seek(SeekFrom::End(0))?;
60        Ok(Self {
61            file,
62            pos,
63            chunk_size: options.reverse_chunk_size.max(1),
64            max_line_len: options.max_line_len,
65            row_frame: options.row_frame,
66            carry: Vec::new(),
67            pending: VecDeque::new(),
68            finished_head: false,
69            reverse_line_no: 0,
70        })
71    }
72
73    pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
74        Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
75    }
76
77    pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
78        loop {
79            if let Some(mut line) = self.pending.pop_front() {
80                self.reverse_line_no += 1;
81                if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
82                    return Ok(Some((self.reverse_line_no, line)));
83                }
84                continue;
85            }
86
87            if self.pos == 0 {
88                if self.finished_head || self.carry.is_empty() {
89                    return Ok(None);
90                }
91                self.finished_head = true;
92                let mut line = std::mem::take(&mut self.carry);
93                trim_line_ending(&mut line);
94                self.check_line_len(line.len())?;
95                if line.iter().any(|b| !b.is_ascii_whitespace()) {
96                    self.reverse_line_no += 1;
97                    if let Some(line) = self.frame_line(self.reverse_line_no, &mut line)? {
98                        return Ok(Some((self.reverse_line_no, line)));
99                    }
100                }
101                return Ok(None);
102            }
103
104            let read_len = self.chunk_size.min(self.pos as usize);
105            self.pos -= read_len as u64;
106            let mut chunk = vec![0u8; read_len];
107            self.file.seek(SeekFrom::Start(self.pos))?;
108            self.file.read_exact(&mut chunk)?;
109
110            let mut end = chunk.len();
111            while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
112                let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
113                line.extend_from_slice(&chunk[nl + 1..end]);
114                line.extend_from_slice(&self.carry);
115                self.carry.clear();
116                end = nl;
117                trim_line_ending(&mut line);
118                self.check_line_len(line.len())?;
119                if line.iter().any(|b| !b.is_ascii_whitespace()) {
120                    self.pending.push_back(line);
121                }
122            }
123
124            if end > 0 {
125                let mut next = Vec::with_capacity(end + self.carry.len());
126                next.extend_from_slice(&chunk[..end]);
127                next.extend_from_slice(&self.carry);
128                self.check_line_len(next.len())?;
129                self.carry = next;
130            }
131        }
132    }
133
134    fn frame_line(&self, line_no: u64, line: &mut Vec<u8>) -> Result<Option<Vec<u8>>, RowError> {
135        match frame_payload(self.row_frame, line_no, line)? {
136            FramePayload::Data(range) => {
137                if range.start > 0 || range.end < line.len() {
138                    line.copy_within(range.clone(), 0);
139                    line.truncate(range.end - range.start);
140                }
141                Ok(Some(std::mem::take(line)))
142            }
143            FramePayload::Skip => Ok(None),
144        }
145    }
146
147    fn check_line_len(&self, len: usize) -> Result<(), RowError> {
148        if len > self.max_line_len {
149            return Err(RowError::LineTooLarge {
150                line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
151                len,
152                max: self.max_line_len,
153            });
154        }
155        Ok(())
156    }
157}
158
159pub fn collect_ndjson_rev<P>(
160    engine: &JetroEngine,
161    path: P,
162    query: &str,
163) -> Result<Vec<Value>, JetroEngineError>
164where
165    P: AsRef<Path>,
166{
167    collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
168}
169
170pub fn collect_ndjson_rev_with_options<P>(
171    engine: &JetroEngine,
172    path: P,
173    query: &str,
174    options: super::ndjson::NdjsonOptions,
175) -> Result<Vec<Value>, JetroEngineError>
176where
177    P: AsRef<Path>,
178{
179    let mut values = Vec::new();
180    drive_rev(engine, path, query, options, |value| {
181        values.push(Value::from(value));
182        Ok(super::ndjson::NdjsonControl::Continue)
183    })?;
184    Ok(values)
185}
186
187pub fn for_each_ndjson_rev<P, F>(
188    engine: &JetroEngine,
189    path: P,
190    query: &str,
191    mut f: F,
192) -> Result<usize, JetroEngineError>
193where
194    P: AsRef<Path>,
195    F: FnMut(Value),
196{
197    for_each_ndjson_rev_with_options(
198        engine,
199        path,
200        query,
201        super::ndjson::NdjsonOptions::default(),
202        |value| {
203            f(value);
204            Ok(super::ndjson::NdjsonControl::Continue)
205        },
206    )
207}
208
209pub fn for_each_ndjson_rev_with_options<P, F>(
210    engine: &JetroEngine,
211    path: P,
212    query: &str,
213    options: super::ndjson::NdjsonOptions,
214    mut f: F,
215) -> Result<usize, JetroEngineError>
216where
217    P: AsRef<Path>,
218    F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
219{
220    drive_rev(engine, path, query, options, |value| f(Value::from(value)))
221}
222
223pub fn collect_ndjson_rev_matches<P>(
224    engine: &JetroEngine,
225    path: P,
226    predicate: &str,
227    limit: usize,
228) -> Result<Vec<Value>, JetroEngineError>
229where
230    P: AsRef<Path>,
231{
232    collect_ndjson_rev_matches_with_options(
233        engine,
234        path,
235        predicate,
236        limit,
237        super::ndjson::NdjsonOptions::default(),
238    )
239}
240
241pub fn collect_ndjson_rev_matches_with_options<P>(
242    engine: &JetroEngine,
243    path: P,
244    predicate: &str,
245    limit: usize,
246    options: super::ndjson::NdjsonOptions,
247) -> Result<Vec<Value>, JetroEngineError>
248where
249    P: AsRef<Path>,
250{
251    let mut values = Vec::with_capacity(limit);
252    drive_rev_matches(engine, path, predicate, limit, options, |value| {
253        values.push(Value::from(value));
254        Ok(super::ndjson::NdjsonControl::Continue)
255    })?;
256    Ok(values)
257}
258
259pub fn run_ndjson_rev<P, W>(
260    engine: &JetroEngine,
261    path: P,
262    query: &str,
263    writer: W,
264) -> Result<usize, JetroEngineError>
265where
266    P: AsRef<Path>,
267    W: Write,
268{
269    run_ndjson_rev_with_options(
270        engine,
271        path,
272        query,
273        writer,
274        super::ndjson::NdjsonOptions::default(),
275    )
276}
277
278pub fn run_ndjson_rev_with_options<P, W>(
279    engine: &JetroEngine,
280    path: P,
281    query: &str,
282    writer: W,
283    options: super::ndjson::NdjsonOptions,
284) -> Result<usize, JetroEngineError>
285where
286    P: AsRef<Path>,
287    W: Write,
288{
289    if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
290        return drive_rev_writer_tape(engine, path, &plan, None, options, writer);
291    }
292
293    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
294    let mut emitted = 0usize;
295    drive_rev(engine, path, query, options, |value| {
296        if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
297            emitted += 1;
298        }
299        Ok(super::ndjson::NdjsonControl::Continue)
300    })?;
301    writer.flush()?;
302    Ok(emitted)
303}
304
305pub fn run_ndjson_rev_limit<P, W>(
306    engine: &JetroEngine,
307    path: P,
308    query: &str,
309    limit: usize,
310    writer: W,
311) -> Result<usize, JetroEngineError>
312where
313    P: AsRef<Path>,
314    W: Write,
315{
316    run_ndjson_rev_limit_with_options(
317        engine,
318        path,
319        query,
320        limit,
321        writer,
322        super::ndjson::NdjsonOptions::default(),
323    )
324}
325
326pub fn run_ndjson_rev_limit_with_options<P, W>(
327    engine: &JetroEngine,
328    path: P,
329    query: &str,
330    limit: usize,
331    writer: W,
332    options: super::ndjson::NdjsonOptions,
333) -> Result<usize, JetroEngineError>
334where
335    P: AsRef<Path>,
336    W: Write,
337{
338    if limit == 0 {
339        return Ok(0);
340    }
341    if let Some(plan) = super::ndjson::direct_tape_plan(engine, query) {
342        return drive_rev_writer_tape(engine, path, &plan, Some(limit), options, writer);
343    }
344
345    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
346    let mut emitted = 0usize;
347    drive_rev(engine, path, query, options, |value| {
348        let wrote = super::ndjson::write_val_line_with_options(&mut writer, &value, options)?;
349        if wrote {
350            emitted += 1;
351        }
352        Ok(if wrote && emitted >= limit {
353            super::ndjson::NdjsonControl::Stop
354        } else {
355            super::ndjson::NdjsonControl::Continue
356        })
357    })?;
358    writer.flush()?;
359    Ok(emitted)
360}
361
362pub fn run_ndjson_rev_distinct_by<P, W>(
363    engine: &JetroEngine,
364    path: P,
365    key_query: &str,
366    query: &str,
367    limit: usize,
368    writer: W,
369) -> Result<usize, JetroEngineError>
370where
371    P: AsRef<Path>,
372    W: Write,
373{
374    run_ndjson_rev_distinct_by_with_options(
375        engine,
376        path,
377        key_query,
378        query,
379        limit,
380        writer,
381        super::ndjson::NdjsonOptions::default(),
382    )
383}
384
385pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
386    engine: &JetroEngine,
387    path: P,
388    key_query: &str,
389    query: &str,
390    limit: usize,
391    writer: W,
392    options: super::ndjson::NdjsonOptions,
393) -> Result<usize, JetroEngineError>
394where
395    P: AsRef<Path>,
396    W: Write,
397{
398    run_ndjson_rev_distinct_by_with_stats_and_options(
399        engine, path, key_query, query, limit, writer, options,
400    )
401    .map(|stats| stats.emitted)
402}
403
404pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
405    engine: &JetroEngine,
406    path: P,
407    key_query: &str,
408    query: &str,
409    limit: usize,
410    writer: W,
411) -> Result<NdjsonRevDistinctStats, JetroEngineError>
412where
413    P: AsRef<Path>,
414    W: Write,
415{
416    run_ndjson_rev_distinct_by_with_stats_and_options(
417        engine,
418        path,
419        key_query,
420        query,
421        limit,
422        writer,
423        super::ndjson::NdjsonOptions::default(),
424    )
425}
426
427pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
428    engine: &JetroEngine,
429    path: P,
430    key_query: &str,
431    query: &str,
432    limit: usize,
433    writer: W,
434    options: super::ndjson::NdjsonOptions,
435) -> Result<NdjsonRevDistinctStats, JetroEngineError>
436where
437    P: AsRef<Path>,
438    W: Write,
439{
440    if limit == 0 {
441        return Ok(NdjsonRevDistinctStats::default());
442    }
443    let direct_key_plan = super::ndjson::direct_tape_plan(engine, key_query);
444    let direct_value_plan = super::ndjson::direct_tape_plan(engine, query)
445        .filter(|plan| tape_plan_can_write_byte_row(plan));
446
447    let mut key_plan = None;
448    let mut value_plan = None;
449    let mut vm = None;
450    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
451    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
452    let mut byte_scratch = Vec::with_capacity(options.initial_buffer_capacity);
453    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
454    let mut seen = AdaptiveDistinctKeys::default();
455    let mut stats = NdjsonRevDistinctStats::default();
456
457    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
458        stats.rows_scanned += 1;
459        let mut row = Some(row);
460        let mut document = None;
461        let direct_key = direct_key_plan.as_ref().and_then(|plan| {
462            row.as_deref()
463                .and_then(|row| distinct_key_direct(row, plan))
464        });
465
466        let inserted = if let Some(key) = direct_key {
467            stats.direct_key_rows += 1;
468            match key {
469                Cow::Borrowed(key) => seen.insert_slice(key),
470                Cow::Owned(key) => seen.insert(key),
471            }
472        } else {
473            stats.fallback_key_rows += 1;
474            let parsed = super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?;
475            let plan = key_plan.get_or_insert_with(|| {
476                engine.cached_plan(key_query, crate::plan::physical::PlanningContext::bytes())
477            });
478            let vm = vm.get_or_insert_with(|| engine.lock_vm());
479            let key = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
480                .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
481            let key = distinct_key_bytes(&key)?;
482            document = Some(parsed);
483            seen.insert(key)
484        };
485        if !inserted {
486            stats.duplicate_rows += 1;
487            continue;
488        }
489        if let (Some(plan), Some(row)) = (direct_value_plan.as_ref(), row.as_deref()) {
490            byte_scratch.clear();
491            out.clear();
492            match write_ndjson_byte_tape_plan_row(&mut out, row, plan, &mut byte_scratch)? {
493                BytePlanWrite::Done => {
494                    if super::ndjson::write_json_bytes_line_with_options(
495                        &mut writer,
496                        &out,
497                        options,
498                    )? {
499                        stats.direct_value_rows += 1;
500                        stats.emitted += 1;
501                    }
502                    if stats.emitted >= limit {
503                        break;
504                    }
505                    continue;
506                }
507                BytePlanWrite::Fallback => {}
508            }
509        }
510
511        let parsed = match document {
512            Some(document) => document,
513            None => super::ndjson::parse_row(engine, reverse_row_no, row.take().unwrap())?,
514        };
515        let plan = value_plan.get_or_insert_with(|| {
516            engine.cached_plan(query, crate::plan::physical::PlanningContext::bytes())
517        });
518        let vm = vm.get_or_insert_with(|| engine.lock_vm());
519        let value = crate::exec::router::collect_plan_val_with_vm(&parsed, plan, vm)
520            .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
521        if super::ndjson::write_val_line_with_options(&mut writer, &value, options)? {
522            stats.fallback_value_rows += 1;
523            stats.emitted += 1;
524        }
525        if stats.emitted >= limit {
526            break;
527        }
528    }
529
530    writer.flush()?;
531    stats.front_filter = seen.front_kind();
532    Ok(stats)
533}
534
535pub fn run_ndjson_rev_distinct_by_with_report<P, W>(
536    engine: &JetroEngine,
537    path: P,
538    key_query: &str,
539    query: &str,
540    limit: usize,
541    writer: W,
542) -> Result<NdjsonExecutionReport, JetroEngineError>
543where
544    P: AsRef<Path>,
545    W: Write,
546{
547    run_ndjson_rev_distinct_by_with_report_and_options(
548        engine,
549        path,
550        key_query,
551        query,
552        limit,
553        writer,
554        super::ndjson::NdjsonOptions::default(),
555    )
556}
557
558pub fn run_ndjson_rev_distinct_by_with_report_and_options<P, W>(
559    engine: &JetroEngine,
560    path: P,
561    key_query: &str,
562    query: &str,
563    limit: usize,
564    writer: W,
565    options: super::ndjson::NdjsonOptions,
566) -> Result<NdjsonExecutionReport, JetroEngineError>
567where
568    P: AsRef<Path>,
569    W: Write,
570{
571    let stats = run_ndjson_rev_distinct_by_with_stats_and_options(
572        engine, path, key_query, query, limit, writer, options,
573    )?;
574    Ok(NdjsonExecutionReport::new(
575        NdjsonRouteExplain {
576            kind: NdjsonRouteKind::RowLocal,
577            source: NdjsonSourceCaps::file(options),
578            writer_path: super::ndjson::ndjson_writer_path_kind(engine, query),
579            rows_plan: None,
580            fallback_reason: None,
581        },
582        NdjsonExecutionStats {
583            rows_scanned: stats.rows_scanned,
584            rows_emitted: stats.emitted,
585            duplicate_rows: stats.duplicate_rows,
586            direct_key_rows: stats.direct_key_rows,
587            fallback_key_rows: stats.fallback_key_rows,
588            direct_project_rows: stats.direct_value_rows,
589            fallback_project_rows: stats.fallback_value_rows,
590            ..NdjsonExecutionStats::default()
591        },
592    ))
593}
594
595#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
596pub struct NdjsonRevDistinctStats {
597    pub rows_scanned: usize,
598    pub emitted: usize,
599    pub duplicate_rows: usize,
600    pub direct_key_rows: usize,
601    pub fallback_key_rows: usize,
602    pub direct_value_rows: usize,
603    pub fallback_value_rows: usize,
604    pub front_filter: DistinctFrontFilterKind,
605}
606fn distinct_key_direct<'a>(
607    row: &'a [u8],
608    plan: &super::ndjson::NdjsonDirectTapePlan,
609) -> Option<Cow<'a, [u8]>> {
610    const NULL_KEY: &[u8] = b"null";
611
612    let super::ndjson::NdjsonDirectTapePlan::RootPath(steps) = plan else {
613        return None;
614    };
615    match raw_json_byte_path_value(row, steps) {
616        RawFieldValue::Found(value) => raw_distinct_key_bytes(value),
617        RawFieldValue::Missing => Some(Cow::Borrowed(NULL_KEY)),
618        RawFieldValue::Fallback => None,
619    }
620}
621
622pub fn run_ndjson_rev_matches<P, W>(
623    engine: &JetroEngine,
624    path: P,
625    predicate: &str,
626    limit: usize,
627    writer: W,
628) -> Result<usize, JetroEngineError>
629where
630    P: AsRef<Path>,
631    W: Write,
632{
633    run_ndjson_rev_matches_with_options(
634        engine,
635        path,
636        predicate,
637        limit,
638        writer,
639        super::ndjson::NdjsonOptions::default(),
640    )
641}
642
643pub fn run_ndjson_rev_matches_with_options<P, W>(
644    engine: &JetroEngine,
645    path: P,
646    predicate: &str,
647    limit: usize,
648    writer: W,
649    options: super::ndjson::NdjsonOptions,
650) -> Result<usize, JetroEngineError>
651where
652    P: AsRef<Path>,
653    W: Write,
654{
655    drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
656}
657
658pub fn run_ndjson_rev_matches_with_report<P, W>(
659    engine: &JetroEngine,
660    path: P,
661    predicate: &str,
662    limit: usize,
663    writer: W,
664) -> Result<NdjsonExecutionReport, JetroEngineError>
665where
666    P: AsRef<Path>,
667    W: Write,
668{
669    run_ndjson_rev_matches_with_report_and_options(
670        engine,
671        path,
672        predicate,
673        limit,
674        writer,
675        super::ndjson::NdjsonOptions::default(),
676    )
677}
678
679pub fn run_ndjson_rev_matches_with_report_and_options<P, W>(
680    engine: &JetroEngine,
681    path: P,
682    predicate: &str,
683    limit: usize,
684    writer: W,
685    options: super::ndjson::NdjsonOptions,
686) -> Result<NdjsonExecutionReport, JetroEngineError>
687where
688    P: AsRef<Path>,
689    W: Write,
690{
691    let (_, stats) =
692        drive_rev_matches_writer_with_stats(engine, path, predicate, limit, options, writer)?;
693    Ok(NdjsonExecutionReport::new(
694        NdjsonRouteExplain::matches(NdjsonSourceCaps::file(options)),
695        stats,
696    ))
697}
698fn drive_rev_writer_tape<P, W>(
699    engine: &JetroEngine,
700    path: P,
701    plan: &super::ndjson::NdjsonDirectTapePlan,
702    limit: Option<usize>,
703    options: super::ndjson::NdjsonOptions,
704    writer: W,
705) -> Result<usize, JetroEngineError>
706where
707    P: AsRef<Path>,
708    W: Write,
709{
710    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
711    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
712    let mut scratch =
713        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
714    let mut out = Vec::with_capacity(options.initial_buffer_capacity);
715    let mut runner = super::ndjson::NdjsonTapeWriterRunner::new(engine, plan);
716    let mut count = 0usize;
717
718    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
719        out.clear();
720        scratch.parse_slice(&row).map_err(|message| {
721            super::ndjson::row_parse_error(
722                reverse_row_no,
723                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
724            )
725        })?;
726        runner.write_row(&scratch, &mut out)?;
727        if super::ndjson::write_json_bytes_line_with_options(&mut writer, &out, options)? {
728            count += 1;
729        }
730        if limit.is_some_and(|limit| count >= limit) {
731            break;
732        }
733    }
734
735    writer.flush()?;
736    Ok(count)
737}
738
739fn drive_rev<P, F>(
740    engine: &JetroEngine,
741    path: P,
742    query: &str,
743    options: super::ndjson::NdjsonOptions,
744    mut emit: F,
745) -> Result<usize, JetroEngineError>
746where
747    P: AsRef<Path>,
748    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
749{
750    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
751    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
752    let mut count = 0usize;
753
754    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
755        let out = executor.eval_owned_row(reverse_row_no, row)?;
756        count += 1;
757        if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
758            break;
759        }
760    }
761
762    Ok(count)
763}
764
765fn drive_rev_matches<P, F>(
766    engine: &JetroEngine,
767    path: P,
768    predicate: &str,
769    limit: usize,
770    options: super::ndjson::NdjsonOptions,
771    mut emit: F,
772) -> Result<usize, JetroEngineError>
773where
774    P: AsRef<Path>,
775    F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
776{
777    if limit == 0 {
778        return Ok(0);
779    }
780
781    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
782    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
783    let mut emitted = 0usize;
784
785    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
786        let document = executor.parse_owned_row(reverse_row_no, row)?;
787        let matched = executor.eval_document(reverse_row_no, &document)?;
788        if !is_truthy(&matched) {
789            continue;
790        }
791
792        let root = document
793            .root_val_with(executor.engine().keys())
794            .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
795        emitted += 1;
796        if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
797            break;
798        }
799    }
800
801    Ok(emitted)
802}
803
804fn drive_rev_matches_writer<P, W>(
805    engine: &JetroEngine,
806    path: P,
807    predicate: &str,
808    limit: usize,
809    options: super::ndjson::NdjsonOptions,
810    writer: W,
811) -> Result<usize, JetroEngineError>
812where
813    P: AsRef<Path>,
814    W: Write,
815{
816    Ok(drive_rev_matches_writer_with_stats(engine, path, predicate, limit, options, writer)?.0)
817}
818
819fn drive_rev_matches_writer_with_stats<P, W>(
820    engine: &JetroEngine,
821    path: P,
822    predicate: &str,
823    limit: usize,
824    options: super::ndjson::NdjsonOptions,
825    writer: W,
826) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
827where
828    P: AsRef<Path>,
829    W: Write,
830{
831    if limit == 0 {
832        return Ok((0, NdjsonExecutionStats::default()));
833    }
834    if let Some(predicate) = super::ndjson::direct_tape_predicate(engine, predicate) {
835        return drive_rev_matches_writer_tape_with_stats(
836            engine, path, &predicate, limit, options, writer,
837        );
838    }
839
840    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
841    let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
842    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
843    let mut emitted = 0usize;
844    let mut stats = NdjsonExecutionStats::default();
845
846    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
847        stats.rows_scanned += 1;
848        let document = executor.parse_owned_row(reverse_row_no, row)?;
849        let matched = executor.eval_document(reverse_row_no, &document)?;
850        stats.fallback_filter_rows += 1;
851        if !is_truthy(&matched) {
852            stats.rows_filtered += 1;
853            continue;
854        }
855
856        super::ndjson::write_document_line(
857            &mut writer,
858            &document,
859            reverse_row_no,
860            executor.engine(),
861        )?;
862        emitted += 1;
863        stats.rows_emitted += 1;
864        if emitted >= limit {
865            break;
866        }
867    }
868
869    writer.flush()?;
870    Ok((emitted, stats))
871}
872fn drive_rev_matches_writer_tape_with_stats<P, W>(
873    engine: &JetroEngine,
874    path: P,
875    predicate: &super::ndjson::NdjsonDirectPredicate,
876    limit: usize,
877    options: super::ndjson::NdjsonOptions,
878    writer: W,
879) -> Result<(usize, NdjsonExecutionStats), JetroEngineError>
880where
881    P: AsRef<Path>,
882    W: Write,
883{
884    let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
885    let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
886    let mut scratch =
887        crate::data::tape::TapeScratch::with_capacity(options.initial_buffer_capacity);
888    let mut emitted = 0usize;
889    let needs_vm = super::ndjson::predicate_needs_vm(predicate);
890    let mut vm = needs_vm.then(|| engine.lock_vm());
891    let env = needs_vm.then(|| crate::data::context::Env::new(crate::Val::Null));
892    let mut predicate_path = super::ndjson::NdjsonPathCache::default();
893    let mut stats = NdjsonExecutionStats::default();
894
895    while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
896        stats.rows_scanned += 1;
897        scratch.parse_slice(&row).map_err(|message| {
898            super::ndjson::row_parse_error(
899                reverse_row_no,
900                JetroEngineError::Eval(crate::EvalError(format!("Invalid JSON: {message}"))),
901            )
902        })?;
903        if !super::ndjson::eval_tape_predicate(
904            &scratch,
905            predicate,
906            env.as_ref(),
907            &mut vm,
908            &mut predicate_path,
909        )
910        .map_err(JetroEngineError::Eval)?
911        {
912            stats.fallback_filter_rows += 1;
913            stats.rows_filtered += 1;
914            continue;
915        }
916        stats.fallback_filter_rows += 1;
917        writer.write_all(&row)?;
918        writer.write_all(b"\n")?;
919        emitted += 1;
920        stats.rows_emitted += 1;
921        if emitted >= limit {
922            break;
923        }
924    }
925
926    writer.flush()?;
927    Ok((emitted, stats))
928}
929
930fn trim_line_ending(buf: &mut Vec<u8>) {
931    while matches!(buf.last(), Some(b'\n' | b'\r')) {
932        buf.pop();
933    }
934}
935
936#[cfg(test)]
937mod tests {
938    use super::NdjsonReverseFileDriver;
939    use crate::JetroEngine;
940    use std::path::PathBuf;
941
942    #[test]
943    fn reverse_driver_reads_rows_from_tail() {
944        let path = temp_path("jetro-ndjson-rev-basic");
945        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
946        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
947
948        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
949        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
950        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
951        assert!(driver.next_line().unwrap().is_none());
952
953        let _ = std::fs::remove_file(path);
954    }
955
956    #[test]
957    fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
958        let path = temp_path("jetro-ndjson-rev-edge");
959        std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
960        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
961
962        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
963        assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
964        assert!(driver.next_line().unwrap().is_none());
965
966        let _ = std::fs::remove_file(path);
967    }
968
969    #[test]
970    fn reverse_driver_reports_reverse_row_numbers() {
971        let path = temp_path("jetro-ndjson-rev-row-no");
972        std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
973        let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
974
975        assert_eq!(
976            driver.next_line_with_reverse_no().unwrap().unwrap(),
977            (1, br#"{"n":2}"#.to_vec())
978        );
979        assert_eq!(
980            driver.next_line_with_reverse_no().unwrap().unwrap(),
981            (2, br#"{"n":1}"#.to_vec())
982        );
983        assert!(driver.next_line_with_reverse_no().unwrap().is_none());
984
985        let _ = std::fs::remove_file(path);
986    }
987
988    #[test]
989    fn reverse_query_uses_direct_writer_shapes() {
990        let path = temp_path("jetro-ndjson-rev-direct");
991        std::fs::write(
992            &path,
993            b"{\"name\":\"ada\",\"attrs\":[{\"key\":\"a\",\"value\":1}]}\n{\"name\":\"bob\",\"attrs\":[{\"key\":\"b\",\"value\":2}]}\n",
994        )
995        .unwrap();
996        let engine = JetroEngine::new();
997        let mut out = Vec::new();
998
999        super::run_ndjson_rev(&engine, &path, "attrs.map([@.key, @.value])", &mut out).unwrap();
1000
1001        assert_eq!(
1002            String::from_utf8(out).unwrap(),
1003            "[[\"b\",2]]\n[[\"a\",1]]\n"
1004        );
1005        let _ = std::fs::remove_file(path);
1006    }
1007    #[test]
1008    fn direct_distinct_key_classifier_rejects_escaped_strings() {
1009        assert_eq!(
1010            super::raw_distinct_key_bytes(br#""plain""#).as_deref(),
1011            Some(br#""plain""#.as_slice())
1012        );
1013        assert_eq!(
1014            super::raw_distinct_key_bytes(br#""a\u0062""#).as_deref(),
1015            Some(br#""ab""#.as_slice())
1016        );
1017        assert_eq!(
1018            super::raw_distinct_key_bytes(br#"{"k":"v"}"#).as_deref(),
1019            Some(br#"{"k":"v"}"#.as_slice())
1020        );
1021        assert_eq!(
1022            super::raw_distinct_key_bytes(b"123").as_deref(),
1023            Some(b"123".as_slice())
1024        );
1025        assert_eq!(
1026            super::raw_distinct_key_bytes(br#"{"a" : 1,"b":"x\u0079"}"#).as_deref(),
1027            Some(br#"{"a":1,"b":"xy"}"#.as_slice())
1028        );
1029        assert_eq!(super::raw_distinct_key_bytes(b"1.0"), None);
1030    }
1031
1032    fn temp_path(name: &str) -> PathBuf {
1033        let mut path = std::env::temp_dir();
1034        path.push(format!("{}-{}.ndjson", name, std::process::id()));
1035        path
1036    }
1037}