Skip to main content

jetro_core/
lib.rs

1//! Jetro core — parser, compiler, and VM for the Jetro JSON query language.
2//!
3//! # Execution path
4//!
5//! ```text
6//! source text
7//!   │  parse::parser::parse() → Expr AST
8//!   │  plan::physical::plan_query() → QueryPlan (physical IR)
9//!   │  exec::router::collect_*() → dispatches to:
10//!   │    StructuralIndex backend  (jetro-experimental bitmap)
11//!   │    ViewPipeline backend     (borrowed tape/Val navigation)
12//!   │    Pipeline backend         (pull-based composed stages)
13//!   └─  VM fallback               (bytecode stack machine)
14//! ```
15//!
16//! # Quick start
17//!
18//! ```rust
19//! use jetro_core::Jetro;
20//! let j = Jetro::from_bytes(br#"{"books":[{"price":12}]}"#.to_vec()).unwrap();
21//! assert_eq!(j.collect("$.books.len()").unwrap(), serde_json::json!(1));
22//! ```
23//!
24//! ```rust
25//! use jetro_core::JetroEngine;
26//! use std::io::Cursor;
27//!
28//! let engine = JetroEngine::new();
29//! let rows = Cursor::new(br#"{"name":"Ada"}
30//! {"name":"Bob"}
31//! "#);
32//! let names = engine.collect_ndjson(rows, "name").unwrap();
33//! assert_eq!(names, vec![serde_json::json!("Ada"), serde_json::json!("Bob")]);
34//! ```
35//!
36//! Match-limited NDJSON helpers evaluate a predicate per row, return the
37//! original full row for truthy matches, and stop after the requested number of
38//! matches:
39//!
40//! ```rust
41//! use jetro_core::JetroEngine;
42//! use std::io::Cursor;
43//!
44//! let engine = JetroEngine::new();
45//! let rows = Cursor::new(br#"{"id":1,"active":true}
46//! {"id":2,"active":false}
47//! {"id":3,"active":true}
48//! "#);
49//! let first_two = engine.collect_ndjson_matches(rows, "active", 2).unwrap();
50//! assert_eq!(first_two, vec![
51//!     serde_json::json!({"id": 1, "active": true}),
52//!     serde_json::json!({"id": 3, "active": true}),
53//! ]);
54//! ```
55
56pub(crate) mod builtins;
57pub(crate) mod compile;
58pub(crate) mod data;
59pub(crate) mod exec;
60pub mod io;
61pub(crate) mod ir;
62pub(crate) mod parse;
63pub(crate) mod plan;
64pub(crate) mod util;
65pub(crate) mod vm;
66
67#[cfg(test)]
68mod tests;
69
70use data::value::Val;
71use serde_json::Value;
72use std::cell::{OnceCell, RefCell};
73use std::collections::HashMap;
74use std::sync::Arc;
75use std::sync::Mutex;
76
77pub use data::context::EvalError;
78#[cfg(test)]
79use parse::parser::ParseError;
80use vm::VM;
81
82/// Internal parser surface re-exported only when the `fuzz_internal` feature
83/// is enabled. Used by the `cargo-fuzz` harness to reach the PEG parser
84/// without going through `Jetro::collect`. NOT a stable public API.
85#[cfg(feature = "fuzz_internal")]
86pub mod __fuzz_internal {
87    pub use crate::parse::parser::{parse, ParseError};
88    pub use crate::plan::physical::plan_query;
89}
90
91#[cfg(test)]
92#[derive(Debug)]
93pub(crate) enum Error {
94    Parse(ParseError),
95    Eval(EvalError),
96}
97
98#[cfg(test)]
99impl std::fmt::Display for Error {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            Error::Parse(e) => write!(f, "{}", e),
103            Error::Eval(e) => write!(f, "{}", e),
104        }
105    }
106}
107#[cfg(test)]
108impl std::error::Error for Error {
109    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
110        match self {
111            Error::Parse(e) => Some(e),
112            Error::Eval(_) => None,
113        }
114    }
115}
116
117#[cfg(test)]
118impl From<ParseError> for Error {
119    fn from(e: ParseError) -> Self {
120        Error::Parse(e)
121    }
122}
123#[cfg(test)]
124impl From<EvalError> for Error {
125    fn from(e: EvalError) -> Self {
126        Error::Eval(e)
127    }
128}
129
130/// Primary entry point. Holds a JSON document and evaluates expressions against
131/// it. Lazy fields (`root_val`, `tape`, `structural_index`, `objvec_cache`)
132/// are populated on first use so callers only pay for the representations a
133/// particular query actually needs.
134pub struct Jetro {
135    /// The `serde_json::Value` root document; unused for byte-backed handles
136    /// where the tape is the authoritative source.
137    document: Value,
138    /// Cached `Val` tree — built once and reused across `collect()` calls.
139    root_val: OnceCell<Val>,
140    /// Retained raw bytes for lazy tape and structural-index materialisation.
141    raw_bytes: Option<Arc<[u8]>>,
142
143    /// Lazily parsed simd-json tape; `Err` is cached to avoid re-parsing after failure.
144    tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
145
146    /// Lazily built bitmap structural index for accelerated key-presence queries.
147    structural_index:
148        OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
149
150    /// Per-document cache from `Arc<Vec<Val>>` pointer addresses to promoted
151    /// `ObjVecData` columnar representations; keyed by pointer to avoid re-promotion.
152    pub(crate) objvec_cache:
153        std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
154
155    /// Per-document VM cache used by `Jetro::collect`; not shared across document handles.
156    vm: RefCell<VM>,
157}
158
159/// Long-lived multi-document query engine with an explicit plan cache.
160/// Use when the same process evaluates many expressions over many documents —
161/// parse/lower/compile work is amortised by this object, not hidden in
162/// thread-local state.
163pub struct JetroEngine {
164    /// Maps `"<context_key>\0<expr>"` to compiled `QueryPlan`; evicted wholesale when full.
165    plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
166    /// Maximum number of entries before the cache is cleared; 0 disables caching.
167    plan_cache_limit: usize,
168    /// The shared `VM` used by all `collect*` calls on this engine instance.
169    vm: Mutex<VM>,
170    /// Engine-owned JSON object-key intern cache. Used by [`JetroEngine::parse_value`]
171    /// and [`JetroEngine::parse_bytes`] (and the `collect_*` shortcuts that go through
172    /// them) so each engine instance has an isolated key cache. Documents built via
173    /// the standalone `Jetro::from_bytes`/`From<serde_json::Value>` paths use the
174    /// process-wide [`crate::data::intern::default_cache`] instead.
175    keys: Arc<crate::data::intern::KeyCache>,
176}
177
178/// Error returned by `JetroEngine::collect_bytes` and similar methods that
179/// may fail during JSON parsing or during expression evaluation.
180#[derive(Debug)]
181pub enum JetroEngineError {
182    /// JSON parsing failed before evaluation could begin.
183    Json(serde_json::Error),
184    /// Reading from a stream or writing results failed.
185    Io(std::io::Error),
186    /// NDJSON row parsing failed with row context.
187    Ndjson(io::RowError),
188    /// Expression evaluation failed (the JSON was valid but the query errored).
189    Eval(EvalError),
190}
191
192impl std::fmt::Display for JetroEngineError {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        match self {
195            Self::Json(err) => write!(f, "{}", err),
196            Self::Io(err) => write!(f, "{}", err),
197            Self::Ndjson(err) => write!(f, "{}", err),
198            Self::Eval(err) => write!(f, "{}", err),
199        }
200    }
201}
202
203impl std::error::Error for JetroEngineError {
204    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
205        match self {
206            Self::Json(err) => Some(err),
207            Self::Io(err) => Some(err),
208            Self::Ndjson(err) => Some(err),
209            Self::Eval(_) => None,
210        }
211    }
212}
213
214impl From<serde_json::Error> for JetroEngineError {
215    fn from(err: serde_json::Error) -> Self {
216        Self::Json(err)
217    }
218}
219
220impl From<std::io::Error> for JetroEngineError {
221    fn from(err: std::io::Error) -> Self {
222        Self::Io(err)
223    }
224}
225
226impl From<io::RowError> for JetroEngineError {
227    fn from(err: io::RowError) -> Self {
228        Self::Ndjson(err)
229    }
230}
231
232impl From<EvalError> for JetroEngineError {
233    fn from(err: EvalError) -> Self {
234        Self::Eval(err)
235    }
236}
237
238impl Default for JetroEngine {
239    fn default() -> Self {
240        Self::new()
241    }
242}
243
244impl JetroEngine {
245    /// Default maximum plan-cache size; the cache is cleared wholesale when reached.
246    const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
247
248    /// Create a `JetroEngine` with the default plan-cache limit of 256 entries.
249    pub fn new() -> Self {
250        Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
251    }
252
253    /// Create a `JetroEngine` with an explicit plan-cache capacity.
254    /// Set `plan_cache_limit` to 0 to disable caching entirely.
255    pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
256        Self {
257            plan_cache: Mutex::new(HashMap::new()),
258            plan_cache_limit,
259            vm: Mutex::new(VM::new()),
260            keys: crate::data::intern::KeyCache::new(),
261        }
262    }
263
264    /// Borrow this engine's JSON key-intern cache.
265    pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
266        &self.keys
267    }
268
269    /// Discard all cached query plans and the engine's key-intern cache,
270    /// forcing re-compilation and re-interning on the next call.
271    pub fn clear_cache(&self) {
272        self.plan_cache.lock().expect("plan cache poisoned").clear();
273        self.keys.clear();
274    }
275
276    /// Build a `Jetro` document from a `serde_json::Value` with object keys
277    /// interned into this engine's key cache. Use this in place of
278    /// `Jetro::from(...)` / the `From<serde_json::Value>` impl when
279    /// per-engine key isolation is required.
280    pub fn parse_value(&self, document: Value) -> Jetro {
281        let root = Val::from_value_with(&self.keys, &document);
282        Jetro::from_val_and_value(root, document)
283    }
284
285    /// Parse raw JSON bytes into a `Jetro` document with object keys
286    /// interned into this engine's key cache. With `simd-json`, the tape
287    /// is materialised eagerly so interning happens once at parse time
288    /// (subsequent `collect` calls reuse the cached `Val` tree).
289    pub fn parse_bytes(&self, bytes: Vec<u8>) -> std::result::Result<Jetro, JetroEngineError> {
290        let document = Jetro::from_bytes(bytes)?;
291        // Force materialisation so keys are interned through this
292        // engine's cache rather than the default thread-local one when
293        // `collect` later asks for `root_val`.
294        let _ = document.root_val_with(&self.keys)?;
295        Ok(document)
296    }
297
298    /// Parse raw JSON bytes into a `Jetro` document without forcing the `Val`
299    /// tree. This keeps byte-backed callers eligible for tape/view execution;
300    /// object keys are interned only if execution later materialises the row.
301    pub(crate) fn parse_bytes_lazy(
302        &self,
303        bytes: Vec<u8>,
304    ) -> std::result::Result<Jetro, JetroEngineError> {
305        Ok(Jetro::from_bytes(bytes)?)
306    }
307
308    /// Evaluate a Jetro expression against an already-constructed `Jetro` document,
309    /// using the engine's shared plan cache and `VM`.
310    pub fn collect<S: AsRef<str>>(
311        &self,
312        document: &Jetro,
313        expr: S,
314    ) -> std::result::Result<Value, EvalError> {
315        let expr = expr.as_ref();
316        if let Some(rows) = io::collect_document_rows(self, document, expr)? {
317            return Ok(Value::from(rows));
318        }
319        let plan = self.cached_plan(expr, exec::router::planning_context(document));
320        self.collect_prepared(document, &plan)
321    }
322
323    pub(crate) fn collect_prepared(
324        &self,
325        document: &Jetro,
326        plan: &ir::physical::QueryPlan,
327    ) -> std::result::Result<Value, EvalError> {
328        self.collect_prepared_val(document, plan).map(Value::from)
329    }
330
331    pub(crate) fn collect_prepared_val(
332        &self,
333        document: &Jetro,
334        plan: &ir::physical::QueryPlan,
335    ) -> std::result::Result<Val, EvalError> {
336        let mut vm = self.vm.lock().expect("vm cache poisoned");
337        exec::router::collect_plan_val_with_vm(document, plan, &mut vm)
338    }
339
340    pub(crate) fn lock_vm(&self) -> std::sync::MutexGuard<'_, VM> {
341        self.vm.lock().expect("vm cache poisoned")
342    }
343
344    /// Convenience wrapper: wrap a `serde_json::Value` in a `Jetro` and evaluate `expr`.
345    /// Routes through [`JetroEngine::parse_value`] so the document's object keys are
346    /// interned into this engine's key cache.
347    pub fn collect_value<S: AsRef<str>>(
348        &self,
349        document: Value,
350        expr: S,
351    ) -> std::result::Result<Value, EvalError> {
352        let document = self.parse_value(document);
353        self.collect(&document, expr)
354    }
355
356    /// Parse raw JSON bytes into a `Jetro` document and evaluate `expr`,
357    /// returning a `JetroEngineError` on either parse or evaluation failure.
358    /// Routes through [`JetroEngine::parse_bytes`] so the document's object keys
359    /// are interned into this engine's key cache.
360    pub fn collect_bytes<S: AsRef<str>>(
361        &self,
362        bytes: Vec<u8>,
363        expr: S,
364    ) -> std::result::Result<Value, JetroEngineError> {
365        let document = self.parse_bytes(bytes)?;
366        Ok(self.collect(&document, expr)?)
367    }
368
369    /// Evaluate `query` independently for every non-empty NDJSON row and write
370    /// one JSON result per output line.
371    pub fn run_ndjson<R, W>(
372        &self,
373        reader: R,
374        query: &str,
375        writer: W,
376    ) -> std::result::Result<usize, JetroEngineError>
377    where
378        R: std::io::BufRead,
379        W: std::io::Write,
380    {
381        io::run_ndjson(self, reader, query, writer)
382    }
383
384    /// Open an NDJSON file and evaluate `query` independently for every
385    /// non-empty row, writing one JSON result per output line.
386    pub fn run_ndjson_file<P, W>(
387        &self,
388        path: P,
389        query: &str,
390        writer: W,
391    ) -> std::result::Result<usize, JetroEngineError>
392    where
393        P: AsRef<std::path::Path>,
394        W: std::io::Write,
395    {
396        io::run_ndjson_file(self, path, query, writer)
397    }
398
399    /// Like [`JetroEngine::run_ndjson_file`] with explicit NDJSON reader options.
400    pub fn run_ndjson_file_with_options<P, W>(
401        &self,
402        path: P,
403        query: &str,
404        writer: W,
405        options: io::NdjsonOptions,
406    ) -> std::result::Result<usize, JetroEngineError>
407    where
408        P: AsRef<std::path::Path>,
409        W: std::io::Write,
410    {
411        io::run_ndjson_file_with_options(self, path, query, writer, options)
412    }
413
414    /// Evaluate a file-backed NDJSON query and return a route/counter report.
415    pub fn run_ndjson_file_with_report<P, W>(
416        &self,
417        path: P,
418        query: &str,
419        writer: W,
420    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
421    where
422        P: AsRef<std::path::Path>,
423        W: std::io::Write,
424    {
425        io::run_ndjson_file_with_report(self, path, query, writer)
426    }
427
428    /// Like [`JetroEngine::run_ndjson_file_with_report`] with explicit options.
429    pub fn run_ndjson_file_with_report_and_options<P, W>(
430        &self,
431        path: P,
432        query: &str,
433        writer: W,
434        options: io::NdjsonOptions,
435    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
436    where
437        P: AsRef<std::path::Path>,
438        W: std::io::Write,
439    {
440        io::run_ndjson_file_with_report_and_options(self, path, query, writer, options)
441    }
442
443    /// Open an NDJSON file, write at most `limit` query results, and stop reading.
444    pub fn run_ndjson_file_limit<P, W>(
445        &self,
446        path: P,
447        query: &str,
448        limit: usize,
449        writer: W,
450    ) -> std::result::Result<usize, JetroEngineError>
451    where
452        P: AsRef<std::path::Path>,
453        W: std::io::Write,
454    {
455        io::run_ndjson_file_limit(self, path, query, limit, writer)
456    }
457
458    /// Like [`JetroEngine::run_ndjson_file_limit`] with explicit NDJSON reader options.
459    pub fn run_ndjson_file_limit_with_options<P, W>(
460        &self,
461        path: P,
462        query: &str,
463        limit: usize,
464        writer: W,
465        options: io::NdjsonOptions,
466    ) -> std::result::Result<usize, JetroEngineError>
467    where
468        P: AsRef<std::path::Path>,
469        W: std::io::Write,
470    {
471        io::run_ndjson_file_limit_with_options(self, path, query, limit, writer, options)
472    }
473
474    /// Evaluate a limited file-backed NDJSON query and return a route/counter report.
475    pub fn run_ndjson_file_limit_with_report<P, W>(
476        &self,
477        path: P,
478        query: &str,
479        limit: usize,
480        writer: W,
481    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
482    where
483        P: AsRef<std::path::Path>,
484        W: std::io::Write,
485    {
486        io::run_ndjson_file_limit_with_report(self, path, query, limit, writer)
487    }
488
489    /// Like [`JetroEngine::run_ndjson_file_limit_with_report`] with explicit options.
490    pub fn run_ndjson_file_limit_with_report_and_options<P, W>(
491        &self,
492        path: P,
493        query: &str,
494        limit: usize,
495        writer: W,
496        options: io::NdjsonOptions,
497    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
498    where
499        P: AsRef<std::path::Path>,
500        W: std::io::Write,
501    {
502        io::run_ndjson_file_limit_with_report_and_options(self, path, query, limit, writer, options)
503    }
504
505    /// Evaluate `query` independently for every row from an [`io::NdjsonSource`].
506    pub fn run_ndjson_source<W>(
507        &self,
508        source: io::NdjsonSource,
509        query: &str,
510        writer: W,
511    ) -> std::result::Result<usize, JetroEngineError>
512    where
513        W: std::io::Write,
514    {
515        io::run_ndjson_source(self, source, query, writer)
516    }
517
518    /// Like [`JetroEngine::run_ndjson_source`] with explicit NDJSON reader options.
519    pub fn run_ndjson_source_with_options<W>(
520        &self,
521        source: io::NdjsonSource,
522        query: &str,
523        writer: W,
524        options: io::NdjsonOptions,
525    ) -> std::result::Result<usize, JetroEngineError>
526    where
527        W: std::io::Write,
528    {
529        io::run_ndjson_source_with_options(self, source, query, writer, options)
530    }
531
532    /// Evaluate an [`io::NdjsonSource`] query and return a route/counter report.
533    pub fn run_ndjson_source_with_report<W>(
534        &self,
535        source: io::NdjsonSource,
536        query: &str,
537        writer: W,
538    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
539    where
540        W: std::io::Write,
541    {
542        io::run_ndjson_source_with_report(self, source, query, writer)
543    }
544
545    /// Like [`JetroEngine::run_ndjson_source_with_report`] with explicit options.
546    pub fn run_ndjson_source_with_report_and_options<W>(
547        &self,
548        source: io::NdjsonSource,
549        query: &str,
550        writer: W,
551        options: io::NdjsonOptions,
552    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
553    where
554        W: std::io::Write,
555    {
556        io::run_ndjson_source_with_report_and_options(self, source, query, writer, options)
557    }
558
559    /// Evaluate `query` for rows from an [`io::NdjsonSource`], write at most
560    /// `limit` results, and stop reading.
561    pub fn run_ndjson_source_limit<W>(
562        &self,
563        source: io::NdjsonSource,
564        query: &str,
565        limit: usize,
566        writer: W,
567    ) -> std::result::Result<usize, JetroEngineError>
568    where
569        W: std::io::Write,
570    {
571        io::run_ndjson_source_limit(self, source, query, limit, writer)
572    }
573
574    /// Like [`JetroEngine::run_ndjson_source_limit`] with explicit NDJSON reader options.
575    pub fn run_ndjson_source_limit_with_options<W>(
576        &self,
577        source: io::NdjsonSource,
578        query: &str,
579        limit: usize,
580        writer: W,
581        options: io::NdjsonOptions,
582    ) -> std::result::Result<usize, JetroEngineError>
583    where
584        W: std::io::Write,
585    {
586        io::run_ndjson_source_limit_with_options(self, source, query, limit, writer, options)
587    }
588
589    /// Evaluate a limited [`io::NdjsonSource`] query and return a route/counter report.
590    pub fn run_ndjson_source_limit_with_report<W>(
591        &self,
592        source: io::NdjsonSource,
593        query: &str,
594        limit: usize,
595        writer: W,
596    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
597    where
598        W: std::io::Write,
599    {
600        io::run_ndjson_source_limit_with_report(self, source, query, limit, writer)
601    }
602
603    /// Like [`JetroEngine::run_ndjson_source_limit_with_report`] with explicit options.
604    pub fn run_ndjson_source_limit_with_report_and_options<W>(
605        &self,
606        source: io::NdjsonSource,
607        query: &str,
608        limit: usize,
609        writer: W,
610        options: io::NdjsonOptions,
611    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
612    where
613        W: std::io::Write,
614    {
615        io::run_ndjson_source_limit_with_report_and_options(
616            self, source, query, limit, writer, options,
617        )
618    }
619
620    /// Read an NDJSON file from tail to head and write one query result per row.
621    pub fn run_ndjson_rev<P, W>(
622        &self,
623        path: P,
624        query: &str,
625        writer: W,
626    ) -> std::result::Result<usize, JetroEngineError>
627    where
628        P: AsRef<std::path::Path>,
629        W: std::io::Write,
630    {
631        io::run_ndjson_rev(self, path, query, writer)
632    }
633
634    /// Like [`JetroEngine::run_ndjson_rev`] with explicit NDJSON reader options.
635    pub fn run_ndjson_rev_with_options<P, W>(
636        &self,
637        path: P,
638        query: &str,
639        writer: W,
640        options: io::NdjsonOptions,
641    ) -> std::result::Result<usize, JetroEngineError>
642    where
643        P: AsRef<std::path::Path>,
644        W: std::io::Write,
645    {
646        io::run_ndjson_rev_with_options(self, path, query, writer, options)
647    }
648
649    /// Read an NDJSON file from tail to head, write at most `limit` query
650    /// results, and stop reading.
651    pub fn run_ndjson_rev_limit<P, W>(
652        &self,
653        path: P,
654        query: &str,
655        limit: usize,
656        writer: W,
657    ) -> std::result::Result<usize, JetroEngineError>
658    where
659        P: AsRef<std::path::Path>,
660        W: std::io::Write,
661    {
662        io::run_ndjson_rev_limit(self, path, query, limit, writer)
663    }
664
665    /// Like [`JetroEngine::run_ndjson_rev_limit`] with explicit NDJSON reader options.
666    pub fn run_ndjson_rev_limit_with_options<P, W>(
667        &self,
668        path: P,
669        query: &str,
670        limit: usize,
671        writer: W,
672        options: io::NdjsonOptions,
673    ) -> std::result::Result<usize, JetroEngineError>
674    where
675        P: AsRef<std::path::Path>,
676        W: std::io::Write,
677    {
678        io::run_ndjson_rev_limit_with_options(self, path, query, limit, writer, options)
679    }
680
681    /// Read an NDJSON file from tail to head, keep only the first row seen for
682    /// each `key_query` result in that reverse stream order, write `query` for
683    /// retained rows, and stop after `limit` retained rows.
684    pub fn run_ndjson_rev_distinct_by<P, W>(
685        &self,
686        path: P,
687        key_query: &str,
688        query: &str,
689        limit: usize,
690        writer: W,
691    ) -> std::result::Result<usize, JetroEngineError>
692    where
693        P: AsRef<std::path::Path>,
694        W: std::io::Write,
695    {
696        io::run_ndjson_rev_distinct_by(self, path, key_query, query, limit, writer)
697    }
698
699    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`] with explicit NDJSON
700    /// reader options.
701    pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
702        &self,
703        path: P,
704        key_query: &str,
705        query: &str,
706        limit: usize,
707        writer: W,
708        options: io::NdjsonOptions,
709    ) -> std::result::Result<usize, JetroEngineError>
710    where
711        P: AsRef<std::path::Path>,
712        W: std::io::Write,
713    {
714        io::run_ndjson_rev_distinct_by_with_options(
715            self, path, key_query, query, limit, writer, options,
716        )
717    }
718
719    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`], returning execution
720    /// counters for path-selection and duplicate-drop observability.
721    pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
722        &self,
723        path: P,
724        key_query: &str,
725        query: &str,
726        limit: usize,
727        writer: W,
728    ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
729    where
730        P: AsRef<std::path::Path>,
731        W: std::io::Write,
732    {
733        io::run_ndjson_rev_distinct_by_with_stats(self, path, key_query, query, limit, writer)
734    }
735
736    /// Like [`JetroEngine::run_ndjson_rev_distinct_by_with_stats`] with explicit
737    /// NDJSON reader options.
738    pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
739        &self,
740        path: P,
741        key_query: &str,
742        query: &str,
743        limit: usize,
744        writer: W,
745        options: io::NdjsonOptions,
746    ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
747    where
748        P: AsRef<std::path::Path>,
749        W: std::io::Write,
750    {
751        io::run_ndjson_rev_distinct_by_with_stats_and_options(
752            self, path, key_query, query, limit, writer, options,
753        )
754    }
755
756    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`], returning the shared
757    /// NDJSON execution report shape.
758    pub fn run_ndjson_rev_distinct_by_with_report<P, W>(
759        &self,
760        path: P,
761        key_query: &str,
762        query: &str,
763        limit: usize,
764        writer: W,
765    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
766    where
767        P: AsRef<std::path::Path>,
768        W: std::io::Write,
769    {
770        io::run_ndjson_rev_distinct_by_with_report(
771            self, path, key_query, query, limit, writer,
772        )
773    }
774
775    /// Like [`JetroEngine::run_ndjson_rev_distinct_by_with_report`] with explicit options.
776    pub fn run_ndjson_rev_distinct_by_with_report_and_options<P, W>(
777        &self,
778        path: P,
779        key_query: &str,
780        query: &str,
781        limit: usize,
782        writer: W,
783        options: io::NdjsonOptions,
784    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
785    where
786        P: AsRef<std::path::Path>,
787        W: std::io::Write,
788    {
789        io::run_ndjson_rev_distinct_by_with_report_and_options(
790            self, path, key_query, query, limit, writer, options,
791        )
792    }
793
794    /// Like [`JetroEngine::run_ndjson`] with explicit NDJSON reader options.
795    pub fn run_ndjson_with_options<R, W>(
796        &self,
797        reader: R,
798        query: &str,
799        writer: W,
800        options: io::NdjsonOptions,
801    ) -> std::result::Result<usize, JetroEngineError>
802    where
803        R: std::io::BufRead,
804        W: std::io::Write,
805    {
806        io::run_ndjson_with_options(self, reader, query, writer, options)
807    }
808
809    /// Evaluate `query` for NDJSON rows and return a route/counter report.
810    pub fn run_ndjson_with_report<R, W>(
811        &self,
812        reader: R,
813        query: &str,
814        writer: W,
815    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
816    where
817        R: std::io::BufRead,
818        W: std::io::Write,
819    {
820        io::run_ndjson_with_report(self, reader, query, writer)
821    }
822
823    /// Like [`JetroEngine::run_ndjson_with_report`] with explicit NDJSON reader options.
824    pub fn run_ndjson_with_report_and_options<R, W>(
825        &self,
826        reader: R,
827        query: &str,
828        writer: W,
829        options: io::NdjsonOptions,
830    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
831    where
832        R: std::io::BufRead,
833        W: std::io::Write,
834    {
835        io::run_ndjson_with_report_and_options(self, reader, query, writer, options)
836    }
837
838    /// Evaluate `query` for NDJSON rows, write at most `limit` results, and stop reading.
839    pub fn run_ndjson_limit<R, W>(
840        &self,
841        reader: R,
842        query: &str,
843        limit: usize,
844        writer: W,
845    ) -> std::result::Result<usize, JetroEngineError>
846    where
847        R: std::io::BufRead,
848        W: std::io::Write,
849    {
850        io::run_ndjson_limit(self, reader, query, limit, writer)
851    }
852
853    /// Like [`JetroEngine::run_ndjson_limit`] with explicit NDJSON reader options.
854    pub fn run_ndjson_limit_with_options<R, W>(
855        &self,
856        reader: R,
857        query: &str,
858        limit: usize,
859        writer: W,
860        options: io::NdjsonOptions,
861    ) -> std::result::Result<usize, JetroEngineError>
862    where
863        R: std::io::BufRead,
864        W: std::io::Write,
865    {
866        io::run_ndjson_limit_with_options(self, reader, query, limit, writer, options)
867    }
868
869    /// Evaluate a limited NDJSON reader query and return a route/counter report.
870    pub fn run_ndjson_limit_with_report<R, W>(
871        &self,
872        reader: R,
873        query: &str,
874        limit: usize,
875        writer: W,
876    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
877    where
878        R: std::io::BufRead,
879        W: std::io::Write,
880    {
881        io::run_ndjson_limit_with_report(self, reader, query, limit, writer)
882    }
883
884    /// Like [`JetroEngine::run_ndjson_limit_with_report`] with explicit options.
885    pub fn run_ndjson_limit_with_report_and_options<R, W>(
886        &self,
887        reader: R,
888        query: &str,
889        limit: usize,
890        writer: W,
891        options: io::NdjsonOptions,
892    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
893    where
894        R: std::io::BufRead,
895        W: std::io::Write,
896    {
897        io::run_ndjson_limit_with_report_and_options(self, reader, query, limit, writer, options)
898    }
899
900    /// Evaluate `predicate` for each NDJSON row, write matching original rows,
901    /// and stop after `limit` matches.
902    pub fn run_ndjson_matches<R, W>(
903        &self,
904        reader: R,
905        predicate: &str,
906        limit: usize,
907        writer: W,
908    ) -> std::result::Result<usize, JetroEngineError>
909    where
910        R: std::io::BufRead,
911        W: std::io::Write,
912    {
913        io::run_ndjson_matches(self, reader, predicate, limit, writer)
914    }
915
916    /// Like [`JetroEngine::run_ndjson_matches`] with explicit NDJSON reader options.
917    pub fn run_ndjson_matches_with_options<R, W>(
918        &self,
919        reader: R,
920        predicate: &str,
921        limit: usize,
922        writer: W,
923        options: io::NdjsonOptions,
924    ) -> std::result::Result<usize, JetroEngineError>
925    where
926        R: std::io::BufRead,
927        W: std::io::Write,
928    {
929        io::run_ndjson_matches_with_options(self, reader, predicate, limit, writer, options)
930    }
931
932    /// Evaluate a match-limited NDJSON query and return the shared execution report.
933    pub fn run_ndjson_matches_with_report<R, W>(
934        &self,
935        reader: R,
936        predicate: &str,
937        limit: usize,
938        writer: W,
939    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
940    where
941        R: std::io::BufRead,
942        W: std::io::Write,
943    {
944        io::run_ndjson_matches_with_report(self, reader, predicate, limit, writer)
945    }
946
947    /// Like [`JetroEngine::run_ndjson_matches_with_report`] with explicit options.
948    pub fn run_ndjson_matches_with_report_and_options<R, W>(
949        &self,
950        reader: R,
951        predicate: &str,
952        limit: usize,
953        writer: W,
954        options: io::NdjsonOptions,
955    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
956    where
957        R: std::io::BufRead,
958        W: std::io::Write,
959    {
960        io::run_ndjson_matches_with_report_and_options(
961            self, reader, predicate, limit, writer, options,
962        )
963    }
964
965    /// Open an NDJSON file, write matching original rows, and stop after `limit` matches.
966    pub fn run_ndjson_matches_file<P, W>(
967        &self,
968        path: P,
969        predicate: &str,
970        limit: usize,
971        writer: W,
972    ) -> std::result::Result<usize, JetroEngineError>
973    where
974        P: AsRef<std::path::Path>,
975        W: std::io::Write,
976    {
977        io::run_ndjson_matches_file(self, path, predicate, limit, writer)
978    }
979
980    /// Like [`JetroEngine::run_ndjson_matches_file`] with explicit NDJSON reader options.
981    pub fn run_ndjson_matches_file_with_options<P, W>(
982        &self,
983        path: P,
984        predicate: &str,
985        limit: usize,
986        writer: W,
987        options: io::NdjsonOptions,
988    ) -> std::result::Result<usize, JetroEngineError>
989    where
990        P: AsRef<std::path::Path>,
991        W: std::io::Write,
992    {
993        io::run_ndjson_matches_file_with_options(self, path, predicate, limit, writer, options)
994    }
995
996    /// Like [`JetroEngine::run_ndjson_matches_file`], returning the shared report.
997    pub fn run_ndjson_matches_file_with_report<P, W>(
998        &self,
999        path: P,
1000        predicate: &str,
1001        limit: usize,
1002        writer: W,
1003    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1004    where
1005        P: AsRef<std::path::Path>,
1006        W: std::io::Write,
1007    {
1008        io::run_ndjson_matches_file_with_report(self, path, predicate, limit, writer)
1009    }
1010
1011    /// Like [`JetroEngine::run_ndjson_matches_file_with_report`] with explicit options.
1012    pub fn run_ndjson_matches_file_with_report_and_options<P, W>(
1013        &self,
1014        path: P,
1015        predicate: &str,
1016        limit: usize,
1017        writer: W,
1018        options: io::NdjsonOptions,
1019    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1020    where
1021        P: AsRef<std::path::Path>,
1022        W: std::io::Write,
1023    {
1024        io::run_ndjson_matches_file_with_report_and_options(
1025            self, path, predicate, limit, writer, options,
1026        )
1027    }
1028
1029    /// Evaluate `predicate` against each row from an [`io::NdjsonSource`], write
1030    /// matching original rows, and stop after `limit` matches.
1031    pub fn run_ndjson_matches_source<W>(
1032        &self,
1033        source: io::NdjsonSource,
1034        predicate: &str,
1035        limit: usize,
1036        writer: W,
1037    ) -> std::result::Result<usize, JetroEngineError>
1038    where
1039        W: std::io::Write,
1040    {
1041        io::run_ndjson_matches_source(self, source, predicate, limit, writer)
1042    }
1043
1044    /// Like [`JetroEngine::run_ndjson_matches_source`] with explicit NDJSON reader options.
1045    pub fn run_ndjson_matches_source_with_options<W>(
1046        &self,
1047        source: io::NdjsonSource,
1048        predicate: &str,
1049        limit: usize,
1050        writer: W,
1051        options: io::NdjsonOptions,
1052    ) -> std::result::Result<usize, JetroEngineError>
1053    where
1054        W: std::io::Write,
1055    {
1056        io::run_ndjson_matches_source_with_options(self, source, predicate, limit, writer, options)
1057    }
1058
1059    /// Like [`JetroEngine::run_ndjson_matches_source`], returning the shared report.
1060    pub fn run_ndjson_matches_source_with_report<W>(
1061        &self,
1062        source: io::NdjsonSource,
1063        predicate: &str,
1064        limit: usize,
1065        writer: W,
1066    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1067    where
1068        W: std::io::Write,
1069    {
1070        io::run_ndjson_matches_source_with_report(self, source, predicate, limit, writer)
1071    }
1072
1073    /// Like [`JetroEngine::run_ndjson_matches_source_with_report`] with explicit options.
1074    pub fn run_ndjson_matches_source_with_report_and_options<W>(
1075        &self,
1076        source: io::NdjsonSource,
1077        predicate: &str,
1078        limit: usize,
1079        writer: W,
1080        options: io::NdjsonOptions,
1081    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1082    where
1083        W: std::io::Write,
1084    {
1085        io::run_ndjson_matches_source_with_report_and_options(
1086            self, source, predicate, limit, writer, options,
1087        )
1088    }
1089
1090    /// Read an NDJSON file from tail to head, write matching original rows, and
1091    /// stop after `limit` matches.
1092    pub fn run_ndjson_rev_matches<P, W>(
1093        &self,
1094        path: P,
1095        predicate: &str,
1096        limit: usize,
1097        writer: W,
1098    ) -> std::result::Result<usize, JetroEngineError>
1099    where
1100        P: AsRef<std::path::Path>,
1101        W: std::io::Write,
1102    {
1103        io::run_ndjson_rev_matches(self, path, predicate, limit, writer)
1104    }
1105
1106    /// Like [`JetroEngine::run_ndjson_rev_matches`] with explicit NDJSON reader options.
1107    pub fn run_ndjson_rev_matches_with_options<P, W>(
1108        &self,
1109        path: P,
1110        predicate: &str,
1111        limit: usize,
1112        writer: W,
1113        options: io::NdjsonOptions,
1114    ) -> std::result::Result<usize, JetroEngineError>
1115    where
1116        P: AsRef<std::path::Path>,
1117        W: std::io::Write,
1118    {
1119        io::run_ndjson_rev_matches_with_options(self, path, predicate, limit, writer, options)
1120    }
1121
1122    /// Like [`JetroEngine::run_ndjson_rev_matches`], returning the shared report.
1123    pub fn run_ndjson_rev_matches_with_report<P, W>(
1124        &self,
1125        path: P,
1126        predicate: &str,
1127        limit: usize,
1128        writer: W,
1129    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1130    where
1131        P: AsRef<std::path::Path>,
1132        W: std::io::Write,
1133    {
1134        io::run_ndjson_rev_matches_with_report(self, path, predicate, limit, writer)
1135    }
1136
1137    /// Like [`JetroEngine::run_ndjson_rev_matches_with_report`] with explicit options.
1138    pub fn run_ndjson_rev_matches_with_report_and_options<P, W>(
1139        &self,
1140        path: P,
1141        predicate: &str,
1142        limit: usize,
1143        writer: W,
1144        options: io::NdjsonOptions,
1145    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1146    where
1147        P: AsRef<std::path::Path>,
1148        W: std::io::Write,
1149    {
1150        io::run_ndjson_rev_matches_with_report_and_options(
1151            self, path, predicate, limit, writer, options,
1152        )
1153    }
1154
1155    /// Evaluate `query` independently for every non-empty NDJSON row and collect
1156    /// the per-row results.
1157    pub fn collect_ndjson<R>(
1158        &self,
1159        reader: R,
1160        query: &str,
1161    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1162    where
1163        R: std::io::BufRead,
1164    {
1165        io::collect_ndjson(self, reader, query)
1166    }
1167
1168    /// Open an NDJSON file and collect per-row query results.
1169    pub fn collect_ndjson_file<P>(
1170        &self,
1171        path: P,
1172        query: &str,
1173    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1174    where
1175        P: AsRef<std::path::Path>,
1176    {
1177        io::collect_ndjson_file(self, path, query)
1178    }
1179
1180    /// Like [`JetroEngine::collect_ndjson_file`] with explicit NDJSON reader options.
1181    pub fn collect_ndjson_file_with_options<P>(
1182        &self,
1183        path: P,
1184        query: &str,
1185        options: io::NdjsonOptions,
1186    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1187    where
1188        P: AsRef<std::path::Path>,
1189    {
1190        io::collect_ndjson_file_with_options(self, path, query, options)
1191    }
1192
1193    /// Collect per-row query results from an [`io::NdjsonSource`].
1194    pub fn collect_ndjson_source(
1195        &self,
1196        source: io::NdjsonSource,
1197        query: &str,
1198    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1199        io::collect_ndjson_source(self, source, query)
1200    }
1201
1202    /// Like [`JetroEngine::collect_ndjson_source`] with explicit NDJSON reader options.
1203    pub fn collect_ndjson_source_with_options(
1204        &self,
1205        source: io::NdjsonSource,
1206        query: &str,
1207        options: io::NdjsonOptions,
1208    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1209        io::collect_ndjson_source_with_options(self, source, query, options)
1210    }
1211
1212    /// Read an NDJSON file from tail to head and collect per-row query results.
1213    pub fn collect_ndjson_rev<P>(
1214        &self,
1215        path: P,
1216        query: &str,
1217    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1218    where
1219        P: AsRef<std::path::Path>,
1220    {
1221        io::collect_ndjson_rev(self, path, query)
1222    }
1223
1224    /// Like [`JetroEngine::collect_ndjson_rev`] with explicit NDJSON reader options.
1225    pub fn collect_ndjson_rev_with_options<P>(
1226        &self,
1227        path: P,
1228        query: &str,
1229        options: io::NdjsonOptions,
1230    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1231    where
1232        P: AsRef<std::path::Path>,
1233    {
1234        io::collect_ndjson_rev_with_options(self, path, query, options)
1235    }
1236
1237    /// Read an NDJSON file from tail to head and call `f` with each query result
1238    /// as it is produced.
1239    pub fn for_each_ndjson_rev<P, F>(
1240        &self,
1241        path: P,
1242        query: &str,
1243        f: F,
1244    ) -> std::result::Result<usize, JetroEngineError>
1245    where
1246        P: AsRef<std::path::Path>,
1247        F: FnMut(Value),
1248    {
1249        io::for_each_ndjson_rev(self, path, query, f)
1250    }
1251
1252    /// Read an NDJSON file from tail to head and call `f` until it returns
1253    /// [`io::NdjsonControl::Stop`] or input is exhausted.
1254    pub fn for_each_ndjson_rev_until<P, F>(
1255        &self,
1256        path: P,
1257        query: &str,
1258        f: F,
1259    ) -> std::result::Result<usize, JetroEngineError>
1260    where
1261        P: AsRef<std::path::Path>,
1262        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1263    {
1264        io::for_each_ndjson_rev_with_options(self, path, query, io::NdjsonOptions::default(), f)
1265    }
1266
1267    /// Like [`JetroEngine::for_each_ndjson_rev_until`] with explicit NDJSON reader options.
1268    pub fn for_each_ndjson_rev_until_with_options<P, F>(
1269        &self,
1270        path: P,
1271        query: &str,
1272        options: io::NdjsonOptions,
1273        f: F,
1274    ) -> std::result::Result<usize, JetroEngineError>
1275    where
1276        P: AsRef<std::path::Path>,
1277        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1278    {
1279        io::for_each_ndjson_rev_with_options(self, path, query, options, f)
1280    }
1281
1282    /// Like [`JetroEngine::for_each_ndjson_rev`] with explicit NDJSON reader options.
1283    pub fn for_each_ndjson_rev_with_options<P, F>(
1284        &self,
1285        path: P,
1286        query: &str,
1287        options: io::NdjsonOptions,
1288        mut f: F,
1289    ) -> std::result::Result<usize, JetroEngineError>
1290    where
1291        P: AsRef<std::path::Path>,
1292        F: FnMut(Value),
1293    {
1294        io::for_each_ndjson_rev_with_options(self, path, query, options, |value| {
1295            f(value);
1296            Ok(io::NdjsonControl::Continue)
1297        })
1298    }
1299
1300    /// Like [`JetroEngine::collect_ndjson`] with explicit NDJSON reader options.
1301    pub fn collect_ndjson_with_options<R>(
1302        &self,
1303        reader: R,
1304        query: &str,
1305        options: io::NdjsonOptions,
1306    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1307    where
1308        R: std::io::BufRead,
1309    {
1310        io::collect_ndjson_with_options(self, reader, query, options)
1311    }
1312
1313    /// Evaluate `predicate` for each NDJSON row, collect matching original
1314    /// rows, and stop after `limit` matches.
1315    pub fn collect_ndjson_matches<R>(
1316        &self,
1317        reader: R,
1318        predicate: &str,
1319        limit: usize,
1320    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1321    where
1322        R: std::io::BufRead,
1323    {
1324        io::collect_ndjson_matches(self, reader, predicate, limit)
1325    }
1326
1327    /// Like [`JetroEngine::collect_ndjson_matches`] with explicit NDJSON reader options.
1328    pub fn collect_ndjson_matches_with_options<R>(
1329        &self,
1330        reader: R,
1331        predicate: &str,
1332        limit: usize,
1333        options: io::NdjsonOptions,
1334    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1335    where
1336        R: std::io::BufRead,
1337    {
1338        io::collect_ndjson_matches_with_options(self, reader, predicate, limit, options)
1339    }
1340
1341    /// Open an NDJSON file, collect matching original rows, and stop after `limit` matches.
1342    pub fn collect_ndjson_matches_file<P>(
1343        &self,
1344        path: P,
1345        predicate: &str,
1346        limit: usize,
1347    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1348    where
1349        P: AsRef<std::path::Path>,
1350    {
1351        io::collect_ndjson_matches_file(self, path, predicate, limit)
1352    }
1353
1354    /// Like [`JetroEngine::collect_ndjson_matches_file`] with explicit NDJSON reader options.
1355    pub fn collect_ndjson_matches_file_with_options<P>(
1356        &self,
1357        path: P,
1358        predicate: &str,
1359        limit: usize,
1360        options: io::NdjsonOptions,
1361    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1362    where
1363        P: AsRef<std::path::Path>,
1364    {
1365        io::collect_ndjson_matches_file_with_options(self, path, predicate, limit, options)
1366    }
1367
1368    /// Evaluate `predicate` against each row from an [`io::NdjsonSource`],
1369    /// collect matching original rows, and stop after `limit` matches.
1370    pub fn collect_ndjson_matches_source(
1371        &self,
1372        source: io::NdjsonSource,
1373        predicate: &str,
1374        limit: usize,
1375    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1376        io::collect_ndjson_matches_source(self, source, predicate, limit)
1377    }
1378
1379    /// Like [`JetroEngine::collect_ndjson_matches_source`] with explicit NDJSON reader options.
1380    pub fn collect_ndjson_matches_source_with_options(
1381        &self,
1382        source: io::NdjsonSource,
1383        predicate: &str,
1384        limit: usize,
1385        options: io::NdjsonOptions,
1386    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1387        io::collect_ndjson_matches_source_with_options(self, source, predicate, limit, options)
1388    }
1389
1390    /// Read an NDJSON file from tail to head, collect matching original rows,
1391    /// and stop after `limit` matches.
1392    pub fn collect_ndjson_rev_matches<P>(
1393        &self,
1394        path: P,
1395        predicate: &str,
1396        limit: usize,
1397    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1398    where
1399        P: AsRef<std::path::Path>,
1400    {
1401        io::collect_ndjson_rev_matches(self, path, predicate, limit)
1402    }
1403
1404    /// Like [`JetroEngine::collect_ndjson_rev_matches`] with explicit NDJSON reader options.
1405    pub fn collect_ndjson_rev_matches_with_options<P>(
1406        &self,
1407        path: P,
1408        predicate: &str,
1409        limit: usize,
1410        options: io::NdjsonOptions,
1411    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1412    where
1413        P: AsRef<std::path::Path>,
1414    {
1415        io::collect_ndjson_rev_matches_with_options(self, path, predicate, limit, options)
1416    }
1417
1418    /// Evaluate `query` independently for every non-empty NDJSON row and call
1419    /// `f` with each result as it is produced.
1420    pub fn for_each_ndjson<R, F>(
1421        &self,
1422        reader: R,
1423        query: &str,
1424        f: F,
1425    ) -> std::result::Result<usize, JetroEngineError>
1426    where
1427        R: std::io::BufRead,
1428        F: FnMut(Value),
1429    {
1430        io::for_each_ndjson(self, reader, query, f)
1431    }
1432
1433    /// Evaluate `query` independently for every non-empty NDJSON row and call
1434    /// `f` until it returns [`io::NdjsonControl::Stop`] or input is exhausted.
1435    pub fn for_each_ndjson_until<R, F>(
1436        &self,
1437        reader: R,
1438        query: &str,
1439        f: F,
1440    ) -> std::result::Result<usize, JetroEngineError>
1441    where
1442        R: std::io::BufRead,
1443        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1444    {
1445        io::for_each_ndjson_until(self, reader, query, f)
1446    }
1447
1448    /// Evaluate `query` for every row from an [`io::NdjsonSource`] and call
1449    /// `f` with each result as it is produced.
1450    pub fn for_each_ndjson_source<F>(
1451        &self,
1452        source: io::NdjsonSource,
1453        query: &str,
1454        f: F,
1455    ) -> std::result::Result<usize, JetroEngineError>
1456    where
1457        F: FnMut(Value),
1458    {
1459        io::for_each_ndjson_source(self, source, query, f)
1460    }
1461
1462    /// Evaluate `query` for every row from an [`io::NdjsonSource`] and call
1463    /// `f` until it returns [`io::NdjsonControl::Stop`] or input is exhausted.
1464    pub fn for_each_ndjson_source_until<F>(
1465        &self,
1466        source: io::NdjsonSource,
1467        query: &str,
1468        f: F,
1469    ) -> std::result::Result<usize, JetroEngineError>
1470    where
1471        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1472    {
1473        io::for_each_ndjson_source_until(self, source, query, f)
1474    }
1475
1476    /// Like [`JetroEngine::for_each_ndjson_source_until`] with explicit NDJSON reader options.
1477    pub fn for_each_ndjson_source_until_with_options<F>(
1478        &self,
1479        source: io::NdjsonSource,
1480        query: &str,
1481        options: io::NdjsonOptions,
1482        f: F,
1483    ) -> std::result::Result<usize, JetroEngineError>
1484    where
1485        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1486    {
1487        io::for_each_ndjson_source_until_with_options(self, source, query, options, f)
1488    }
1489
1490    /// Like [`JetroEngine::for_each_ndjson_source`] with explicit NDJSON reader options.
1491    pub fn for_each_ndjson_source_with_options<F>(
1492        &self,
1493        source: io::NdjsonSource,
1494        query: &str,
1495        options: io::NdjsonOptions,
1496        f: F,
1497    ) -> std::result::Result<usize, JetroEngineError>
1498    where
1499        F: FnMut(Value),
1500    {
1501        io::for_each_ndjson_source_with_options(self, source, query, options, f)
1502    }
1503
1504    /// Like [`JetroEngine::for_each_ndjson`] with explicit NDJSON reader options.
1505    pub fn for_each_ndjson_with_options<R, F>(
1506        &self,
1507        reader: R,
1508        query: &str,
1509        options: io::NdjsonOptions,
1510        f: F,
1511    ) -> std::result::Result<usize, JetroEngineError>
1512    where
1513        R: std::io::BufRead,
1514        F: FnMut(Value),
1515    {
1516        io::for_each_ndjson_with_options(self, reader, query, options, f)
1517    }
1518
1519    /// Like [`JetroEngine::for_each_ndjson_until`] with explicit NDJSON reader options.
1520    pub fn for_each_ndjson_until_with_options<R, F>(
1521        &self,
1522        reader: R,
1523        query: &str,
1524        options: io::NdjsonOptions,
1525        f: F,
1526    ) -> std::result::Result<usize, JetroEngineError>
1527    where
1528        R: std::io::BufRead,
1529        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1530    {
1531        io::for_each_ndjson_until_with_options(self, reader, query, options, f)
1532    }
1533
1534    /// Look up a compiled `QueryPlan` by expression string and planning context,
1535    /// compiling and inserting it if not already cached; evicts the whole cache if full.
1536    pub(crate) fn cached_plan(
1537        &self,
1538        expr: &str,
1539        context: plan::physical::PlanningContext,
1540    ) -> ir::physical::QueryPlan {
1541        let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
1542        let cache_key = format!("{}\0{}", context.cache_key(), expr);
1543        if let Some(plan) = cache.get(&cache_key) {
1544            return plan.clone();
1545        }
1546
1547        let plan = plan::physical::plan_query_with_context(expr, context);
1548        if self.plan_cache_limit > 0 {
1549            if cache.len() >= self.plan_cache_limit {
1550                cache.clear();
1551            }
1552            cache.insert(cache_key, plan.clone());
1553        }
1554        plan
1555    }
1556}
1557
1558impl exec::pipeline::PipelineData for Jetro {
1559    fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
1560        self.get_or_promote_objvec(arr)
1561    }
1562}
1563
1564impl Jetro {
1565    /// Return a reference to the lazily parsed simd-json `TapeData`, parsing raw bytes
1566    /// on first access. Returns `Ok(None)` when no raw bytes are stored.
1567    pub(crate) fn lazy_tape(
1568        &self,
1569    ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
1570        if let Some(result) = self.tape.get() {
1571            return result
1572                .as_ref()
1573                .map(Some)
1574                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1575        }
1576        let Some(raw) = self.raw_bytes.as_ref() else {
1577            return Ok(None);
1578        };
1579        let bytes: Vec<u8> = (**raw).to_vec();
1580        let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
1581        let _ = self.tape.set(parsed);
1582        self.tape
1583            .get()
1584            .expect("tape cache initialized")
1585            .as_ref()
1586            .map(Some)
1587            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1588    }
1589
1590    /// Look up or build an `ObjVecData` columnar representation for the given
1591    /// `Arc<Vec<Val>>` array, caching the result by pointer address.
1592    pub(crate) fn get_or_promote_objvec(
1593        &self,
1594        arr: &Arc<Vec<Val>>,
1595    ) -> Option<Arc<crate::data::value::ObjVecData>> {
1596        let key = Arc::as_ptr(arr) as usize;
1597        if let Ok(cache) = self.objvec_cache.lock() {
1598            if let Some(d) = cache.get(&key) {
1599                return Some(Arc::clone(d));
1600            }
1601        }
1602        let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
1603        if let Ok(mut cache) = self.objvec_cache.lock() {
1604            cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
1605        }
1606        Some(promoted)
1607    }
1608
1609    /// Internal constructor that wraps a `serde_json::Value` without raw bytes.
1610    pub(crate) fn new(document: Value) -> Self {
1611        Self {
1612            document,
1613            root_val: OnceCell::new(),
1614            objvec_cache: Default::default(),
1615            raw_bytes: None,
1616            tape: OnceCell::new(),
1617            structural_index: OnceCell::new(),
1618            vm: RefCell::new(VM::new()),
1619        }
1620    }
1621
1622    /// Build a `Jetro` whose `root_val` is pre-cached with `root` (constructed by the
1623    /// caller, typically via [`Val::from_value_with`] using an engine-owned key cache).
1624    /// `document` is retained for value-backed callers and tests that read the
1625    /// original `serde_json::Value`.
1626    pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
1627        let root_val = OnceCell::new();
1628        let _ = root_val.set(root);
1629        Self {
1630            document,
1631            root_val,
1632            objvec_cache: Default::default(),
1633            raw_bytes: None,
1634            tape: OnceCell::new(),
1635            structural_index: OnceCell::new(),
1636            vm: RefCell::new(VM::new()),
1637        }
1638    }
1639
1640    /// Like [`Jetro::root_val`] but interns object keys through `keys` instead of the
1641    /// process-wide default. Used by [`JetroEngine::parse_bytes`] to materialise the
1642    /// `Val` tree once at parse time so subsequent `collect` calls find a populated
1643    /// `root_val` cache and skip re-interning.
1644    pub(crate) fn root_val_with(
1645        &self,
1646        keys: &crate::data::intern::KeyCache,
1647    ) -> std::result::Result<Val, EvalError> {
1648        if let Some(root) = self.root_val.get() {
1649            return Ok(root.clone());
1650        }
1651        let root = {
1652            if let Some(tape) = self.lazy_tape()? {
1653                Val::from_tape_data_with(keys, tape)
1654            } else {
1655                Val::from_value_with(keys, &self.document)
1656            }
1657        };
1658        let _ = self.root_val.set(root);
1659        Ok(self.root_val.get().expect("root val initialized").clone())
1660    }
1661
1662    /// Parse raw JSON bytes and build a `Jetro` query handle.
1663    /// The bytes are not parsed eagerly; the tape is built lazily on the first
1664    /// query that needs it.
1665    pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
1666        Ok(Self {
1667            document: Value::Null,
1668            root_val: OnceCell::new(),
1669            objvec_cache: Default::default(),
1670            raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1671            tape: OnceCell::new(),
1672            structural_index: OnceCell::new(),
1673            vm: RefCell::new(VM::new()),
1674        })
1675    }
1676
1677    /// Borrow this document's VM cache, falling back to a temporary VM on re-entrant use.
1678    pub(crate) fn with_vm<F, R>(&self, f: F) -> R
1679    where
1680        F: FnOnce(&mut VM) -> R,
1681    {
1682        match self.vm.try_borrow_mut() {
1683            Ok(mut vm) => f(&mut vm),
1684            Err(_) => {
1685                let mut vm = VM::new();
1686                f(&mut vm)
1687            }
1688        }
1689    }
1690
1691    /// Return the raw JSON byte slice if this handle was constructed from bytes,
1692    /// or `None` if it was constructed from a `serde_json::Value`.
1693    pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
1694        self.raw_bytes.as_deref()
1695    }
1696
1697    /// Return a reference to the lazily built `StructuralIndex` for key-presence
1698    /// queries, constructing it from raw bytes on first access if available.
1699    pub(crate) fn lazy_structural_index(
1700        &self,
1701    ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
1702        if let Some(result) = self.structural_index.get() {
1703            return result
1704                .as_ref()
1705                .map(Some)
1706                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1707        }
1708        let Some(raw) = self.raw_bytes.as_ref() else {
1709            return Ok(None);
1710        };
1711        let built = jetro_experimental::from_bytes_with(
1712            raw.as_ref(),
1713            jetro_experimental::BuildOptions::keys_only(),
1714        )
1715        .map(Arc::new)
1716        .map_err(|err| err.to_string());
1717        let _ = self.structural_index.set(built);
1718        self.structural_index
1719            .get()
1720            .expect("structural index cache initialized")
1721            .as_ref()
1722            .map(Some)
1723            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1724    }
1725
1726    /// Return the root `Val` for the document, building and caching it from the
1727    /// tape or from the `serde_json::Value` on first access.
1728    pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
1729        if let Some(root) = self.root_val.get() {
1730            return Ok(root.clone());
1731        }
1732        let root = {
1733            if let Some(tape) = self.lazy_tape()? {
1734                Val::from_tape_data(tape)
1735            } else {
1736                Val::from(&self.document)
1737            }
1738        };
1739        let _ = self.root_val.set(root);
1740        Ok(self.root_val.get().expect("root val initialized").clone())
1741    }
1742
1743    /// Return `true` if the `Val` tree has already been materialised; used in
1744    /// tests to assert that lazy evaluation is working correctly.
1745    #[cfg(test)]
1746    pub(crate) fn root_val_is_materialized(&self) -> bool {
1747        self.root_val.get().is_some()
1748    }
1749
1750    #[cfg(test)]
1751    pub(crate) fn structural_index_is_built(&self) -> bool {
1752        self.structural_index.get().is_some()
1753    }
1754
1755    #[cfg(test)]
1756    pub(crate) fn tape_is_built(&self) -> bool {
1757        self.tape.get().is_some()
1758    }
1759
1760    #[cfg(test)]
1761    pub(crate) fn reset_tape_materialized_subtrees(&self) {
1762        if let Ok(Some(tape)) = self.lazy_tape() {
1763            tape.reset_materialized_subtrees();
1764        }
1765    }
1766
1767    #[cfg(test)]
1768    pub(crate) fn tape_materialized_subtrees(&self) -> usize {
1769        self.lazy_tape()
1770            .ok()
1771            .flatten()
1772            .map(|tape| tape.materialized_subtrees())
1773            .unwrap_or(0)
1774    }
1775
1776    /// Evaluate a Jetro expression against this document and return the result
1777    /// as a `serde_json::Value`. Uses this document's VM with compile and
1778    /// path-resolution caches for repeated calls.
1779    pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
1780        exec::router::collect_json(self, expr.as_ref())
1781    }
1782}
1783
1784/// Wrap an existing `serde_json::Value` in a `Jetro` handle without raw bytes.
1785/// Prefer `Jetro::from_bytes` when you have the original JSON source, as it
1786/// enables the tape and structural-index lazy backends.
1787impl From<Value> for Jetro {
1788    /// Convert a `serde_json::Value` into a `Jetro` query handle.
1789    fn from(v: Value) -> Self {
1790        Self::new(v)
1791    }
1792}