Skip to main content

jetro_core/
lib.rs

1//! Jetro core — parser, compiler, and VM for the Jetro JSON query language.
2//!
3//! # Execution path
4//!
5//! ```text
6//! source text
7//!   │  parse::parser::parse() → Expr AST
8//!   │  plan::physical::plan_query() → QueryPlan (physical IR)
9//!   │  exec::router::collect_*() → dispatches to:
10//!   │    StructuralIndex backend  (jetro-experimental bitmap)
11//!   │    ViewPipeline backend     (borrowed tape/Val navigation)
12//!   │    Pipeline backend         (pull-based composed stages)
13//!   └─  VM fallback               (bytecode stack machine)
14//! ```
15//!
16//! # Quick start
17//!
18//! ```rust
19//! use jetro_core::Jetro;
20//! let j = Jetro::from_bytes(br#"{"books":[{"price":12}]}"#.to_vec()).unwrap();
21//! assert_eq!(j.collect("$.books.len()").unwrap(), serde_json::json!(1));
22//! ```
23//!
24//! ```rust
25//! use jetro_core::JetroEngine;
26//! use std::io::Cursor;
27//!
28//! let engine = JetroEngine::new();
29//! let rows = Cursor::new(br#"{"name":"Ada"}
30//! {"name":"Bob"}
31//! "#);
32//! let names = engine.collect_ndjson(rows, "name").unwrap();
33//! assert_eq!(names, vec![serde_json::json!("Ada"), serde_json::json!("Bob")]);
34//! ```
35//!
36//! Match-limited NDJSON helpers evaluate a predicate per row, return the
37//! original full row for truthy matches, and stop after the requested number of
38//! matches:
39//!
40//! ```rust
41//! use jetro_core::JetroEngine;
42//! use std::io::Cursor;
43//!
44//! let engine = JetroEngine::new();
45//! let rows = Cursor::new(br#"{"id":1,"active":true}
46//! {"id":2,"active":false}
47//! {"id":3,"active":true}
48//! "#);
49//! let first_two = engine.collect_ndjson_matches(rows, "active", 2).unwrap();
50//! assert_eq!(first_two, vec![
51//!     serde_json::json!({"id": 1, "active": true}),
52//!     serde_json::json!({"id": 3, "active": true}),
53//! ]);
54//! ```
55
56pub(crate) mod builtins;
57pub(crate) mod compile;
58pub(crate) mod data;
59pub(crate) mod exec;
60pub mod introspect;
61pub mod io;
62pub(crate) mod ir;
63pub(crate) mod parse;
64pub(crate) mod plan;
65pub(crate) mod util;
66pub(crate) mod vm;
67
68#[cfg(test)]
69mod tests;
70
71use data::value::Val;
72use serde_json::Value;
73use std::cell::{OnceCell, RefCell};
74use std::collections::HashMap;
75use std::sync::Arc;
76use std::sync::Mutex;
77
78pub use data::context::EvalError;
79#[cfg(test)]
80use parse::parser::ParseError;
81use vm::VM;
82
83/// Internal parser surface re-exported only when the `fuzz_internal` feature
84/// is enabled. Used by the `cargo-fuzz` harness to reach the PEG parser
85/// without going through `Jetro::collect`. NOT a stable public API.
86#[cfg(feature = "fuzz_internal")]
87pub mod __fuzz_internal {
88    pub use crate::parse::parser::{parse, ParseError};
89    pub use crate::plan::physical::plan_query;
90}
91
92#[cfg(test)]
93#[derive(Debug)]
94pub(crate) enum Error {
95    Parse(ParseError),
96    Eval(EvalError),
97}
98
99#[cfg(test)]
100impl std::fmt::Display for Error {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        match self {
103            Error::Parse(e) => write!(f, "{}", e),
104            Error::Eval(e) => write!(f, "{}", e),
105        }
106    }
107}
108#[cfg(test)]
109impl std::error::Error for Error {
110    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
111        match self {
112            Error::Parse(e) => Some(e),
113            Error::Eval(_) => None,
114        }
115    }
116}
117
118#[cfg(test)]
119impl From<ParseError> for Error {
120    fn from(e: ParseError) -> Self {
121        Error::Parse(e)
122    }
123}
124#[cfg(test)]
125impl From<EvalError> for Error {
126    fn from(e: EvalError) -> Self {
127        Error::Eval(e)
128    }
129}
130
131/// Primary entry point. Holds a JSON document and evaluates expressions against
132/// it. Lazy fields (`root_val`, `tape`, `structural_index`, `objvec_cache`)
133/// are populated on first use so callers only pay for the representations a
134/// particular query actually needs.
135pub struct Jetro {
136    /// The `serde_json::Value` root document; unused for byte-backed handles
137    /// where the tape is the authoritative source.
138    document: Value,
139    /// Cached `Val` tree — built once and reused across `collect()` calls.
140    root_val: OnceCell<Val>,
141    /// Retained raw bytes for lazy tape and structural-index materialisation.
142    raw_bytes: Option<Arc<[u8]>>,
143
144    /// Lazily parsed simd-json tape; `Err` is cached to avoid re-parsing after failure.
145    tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
146
147    /// Lazily built bitmap structural index for accelerated key-presence queries.
148    structural_index:
149        OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
150
151    /// Per-document cache from `Arc<Vec<Val>>` pointer addresses to promoted
152    /// `ObjVecData` columnar representations; keyed by pointer to avoid re-promotion.
153    pub(crate) objvec_cache:
154        std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
155
156    /// Per-document VM cache used by `Jetro::collect`; not shared across document handles.
157    vm: RefCell<VM>,
158}
159
160/// Long-lived multi-document query engine with an explicit plan cache.
161/// Use when the same process evaluates many expressions over many documents —
162/// parse/lower/compile work is amortised by this object, not hidden in
163/// thread-local state.
164pub struct JetroEngine {
165    /// Maps `"<context_key>\0<expr>"` to compiled `QueryPlan`; evicted wholesale when full.
166    plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
167    /// Maximum number of entries before the cache is cleared; 0 disables caching.
168    plan_cache_limit: usize,
169    /// The shared `VM` used by all `collect*` calls on this engine instance.
170    vm: Mutex<VM>,
171    /// Engine-owned JSON object-key intern cache. Used by [`JetroEngine::parse_value`]
172    /// and [`JetroEngine::parse_bytes`] (and the `collect_*` shortcuts that go through
173    /// them) so each engine instance has an isolated key cache. Documents built via
174    /// the standalone `Jetro::from_bytes`/`From<serde_json::Value>` paths use the
175    /// process-wide [`crate::data::intern::default_cache`] instead.
176    keys: Arc<crate::data::intern::KeyCache>,
177}
178
179/// Error returned by `JetroEngine::collect_bytes` and similar methods that
180/// may fail during JSON parsing or during expression evaluation.
181#[derive(Debug)]
182pub enum JetroEngineError {
183    /// JSON parsing failed before evaluation could begin.
184    Json(serde_json::Error),
185    /// Reading from a stream or writing results failed.
186    Io(std::io::Error),
187    /// NDJSON row parsing failed with row context.
188    Ndjson(io::RowError),
189    /// Expression evaluation failed (the JSON was valid but the query errored).
190    Eval(EvalError),
191}
192
193impl std::fmt::Display for JetroEngineError {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        match self {
196            Self::Json(err) => write!(f, "{}", err),
197            Self::Io(err) => write!(f, "{}", err),
198            Self::Ndjson(err) => write!(f, "{}", err),
199            Self::Eval(err) => write!(f, "{}", err),
200        }
201    }
202}
203
204impl std::error::Error for JetroEngineError {
205    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
206        match self {
207            Self::Json(err) => Some(err),
208            Self::Io(err) => Some(err),
209            Self::Ndjson(err) => Some(err),
210            Self::Eval(_) => None,
211        }
212    }
213}
214
215impl From<serde_json::Error> for JetroEngineError {
216    fn from(err: serde_json::Error) -> Self {
217        Self::Json(err)
218    }
219}
220
221impl From<std::io::Error> for JetroEngineError {
222    fn from(err: std::io::Error) -> Self {
223        Self::Io(err)
224    }
225}
226
227impl From<io::RowError> for JetroEngineError {
228    fn from(err: io::RowError) -> Self {
229        Self::Ndjson(err)
230    }
231}
232
233impl From<EvalError> for JetroEngineError {
234    fn from(err: EvalError) -> Self {
235        Self::Eval(err)
236    }
237}
238
239impl Default for JetroEngine {
240    fn default() -> Self {
241        Self::new()
242    }
243}
244
245impl JetroEngine {
246    /// Default maximum plan-cache size; the cache is cleared wholesale when reached.
247    const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
248
249    /// Inspect how a query parses, plans, and selects execution metadata.
250    ///
251    /// This is an explicit developer API. Normal query execution does not
252    /// allocate or populate introspection reports.
253    pub fn inspect_query(
254        &self,
255        query: &str,
256        options: introspect::InspectOptions,
257    ) -> std::result::Result<introspect::QueryInspection, JetroEngineError> {
258        introspect::inspect_query(self, query, options, io::NdjsonOptions::default())
259    }
260
261    /// Inspect an NDJSON query with explicit NDJSON source options.
262    pub fn inspect_ndjson_query_with_options(
263        &self,
264        query: &str,
265        source: io::NdjsonSourceMode,
266        ndjson_options: io::NdjsonOptions,
267        level: introspect::InspectLevel,
268    ) -> std::result::Result<introspect::QueryInspection, JetroEngineError> {
269        let context = match source {
270            io::NdjsonSourceMode::Reader => introspect::InspectContext::NdjsonReader,
271            io::NdjsonSourceMode::File => introspect::InspectContext::NdjsonFile,
272        };
273        introspect::inspect_query(
274            self,
275            query,
276            introspect::InspectOptions { level, context },
277            ndjson_options,
278        )
279    }
280
281    /// Create a `JetroEngine` with the default plan-cache limit of 256 entries.
282    pub fn new() -> Self {
283        Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
284    }
285
286    /// Create a `JetroEngine` with an explicit plan-cache capacity.
287    /// Set `plan_cache_limit` to 0 to disable caching entirely.
288    pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
289        Self {
290            plan_cache: Mutex::new(HashMap::new()),
291            plan_cache_limit,
292            vm: Mutex::new(VM::new()),
293            keys: crate::data::intern::KeyCache::new(),
294        }
295    }
296
297    /// Borrow this engine's JSON key-intern cache.
298    pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
299        &self.keys
300    }
301
302    /// Discard all cached query plans and the engine's key-intern cache,
303    /// forcing re-compilation and re-interning on the next call.
304    pub fn clear_cache(&self) {
305        self.plan_cache.lock().expect("plan cache poisoned").clear();
306        self.keys.clear();
307    }
308
309    /// Build a `Jetro` document from a `serde_json::Value` with object keys
310    /// interned into this engine's key cache. Use this in place of
311    /// `Jetro::from(...)` / the `From<serde_json::Value>` impl when
312    /// per-engine key isolation is required.
313    pub fn parse_value(&self, document: Value) -> Jetro {
314        let root = Val::from_value_with(&self.keys, &document);
315        Jetro::from_val_and_value(root, document)
316    }
317
318    /// Parse raw JSON bytes into a `Jetro` document with object keys
319    /// interned into this engine's key cache. With `simd-json`, the tape
320    /// is materialised eagerly so interning happens once at parse time
321    /// (subsequent `collect` calls reuse the cached `Val` tree).
322    pub fn parse_bytes(&self, bytes: Vec<u8>) -> std::result::Result<Jetro, JetroEngineError> {
323        let document = Jetro::from_bytes(bytes)?;
324        // Force materialisation so keys are interned through this
325        // engine's cache rather than the default thread-local one when
326        // `collect` later asks for `root_val`.
327        let _ = document.root_val_with(&self.keys)?;
328        Ok(document)
329    }
330
331    /// Parse raw JSON bytes into a `Jetro` document without forcing the `Val`
332    /// tree. This keeps byte-backed callers eligible for tape/view execution;
333    /// object keys are interned only if execution later materialises the row.
334    pub(crate) fn parse_bytes_lazy(
335        &self,
336        bytes: Vec<u8>,
337    ) -> std::result::Result<Jetro, JetroEngineError> {
338        Ok(Jetro::from_bytes(bytes)?)
339    }
340
341    /// Evaluate a Jetro expression against an already-constructed `Jetro` document,
342    /// using the engine's shared plan cache and `VM`.
343    pub fn collect<S: AsRef<str>>(
344        &self,
345        document: &Jetro,
346        expr: S,
347    ) -> std::result::Result<Value, EvalError> {
348        let expr = expr.as_ref();
349        if let Some(rows) = io::collect_document_rows(self, document, expr)? {
350            return Ok(Value::from(rows));
351        }
352        let plan = self.cached_plan(expr, exec::router::planning_context(document));
353        self.collect_prepared(document, &plan)
354    }
355
356    pub(crate) fn collect_prepared(
357        &self,
358        document: &Jetro,
359        plan: &ir::physical::QueryPlan,
360    ) -> std::result::Result<Value, EvalError> {
361        self.collect_prepared_val(document, plan).map(Value::from)
362    }
363
364    pub(crate) fn collect_prepared_val(
365        &self,
366        document: &Jetro,
367        plan: &ir::physical::QueryPlan,
368    ) -> std::result::Result<Val, EvalError> {
369        let mut vm = self.vm.lock().expect("vm cache poisoned");
370        exec::router::collect_plan_val_with_vm(document, plan, &mut vm)
371    }
372
373    pub(crate) fn lock_vm(&self) -> std::sync::MutexGuard<'_, VM> {
374        self.vm.lock().expect("vm cache poisoned")
375    }
376
377    /// Convenience wrapper: wrap a `serde_json::Value` in a `Jetro` and evaluate `expr`.
378    /// Routes through [`JetroEngine::parse_value`] so the document's object keys are
379    /// interned into this engine's key cache.
380    pub fn collect_value<S: AsRef<str>>(
381        &self,
382        document: Value,
383        expr: S,
384    ) -> std::result::Result<Value, EvalError> {
385        let document = self.parse_value(document);
386        self.collect(&document, expr)
387    }
388
389    /// Parse raw JSON bytes into a `Jetro` document and evaluate `expr`,
390    /// returning a `JetroEngineError` on either parse or evaluation failure.
391    /// Routes through [`JetroEngine::parse_bytes`] so the document's object keys
392    /// are interned into this engine's key cache.
393    pub fn collect_bytes<S: AsRef<str>>(
394        &self,
395        bytes: Vec<u8>,
396        expr: S,
397    ) -> std::result::Result<Value, JetroEngineError> {
398        let document = self.parse_bytes(bytes)?;
399        Ok(self.collect(&document, expr)?)
400    }
401
402    /// Evaluate `query` independently for every non-empty NDJSON row and write
403    /// one JSON result per output line.
404    pub fn run_ndjson<R, W>(
405        &self,
406        reader: R,
407        query: &str,
408        writer: W,
409    ) -> std::result::Result<usize, JetroEngineError>
410    where
411        R: std::io::BufRead,
412        W: std::io::Write,
413    {
414        io::run_ndjson(self, reader, query, writer)
415    }
416
417    /// Open an NDJSON file and evaluate `query` independently for every
418    /// non-empty row, writing one JSON result per output line.
419    pub fn run_ndjson_file<P, W>(
420        &self,
421        path: P,
422        query: &str,
423        writer: W,
424    ) -> std::result::Result<usize, JetroEngineError>
425    where
426        P: AsRef<std::path::Path>,
427        W: std::io::Write,
428    {
429        io::run_ndjson_file(self, path, query, writer)
430    }
431
432    /// Like [`JetroEngine::run_ndjson_file`] with explicit NDJSON reader options.
433    pub fn run_ndjson_file_with_options<P, W>(
434        &self,
435        path: P,
436        query: &str,
437        writer: W,
438        options: io::NdjsonOptions,
439    ) -> std::result::Result<usize, JetroEngineError>
440    where
441        P: AsRef<std::path::Path>,
442        W: std::io::Write,
443    {
444        io::run_ndjson_file_with_options(self, path, query, writer, options)
445    }
446
447    /// Evaluate a file-backed NDJSON query and return a route/counter report.
448    pub fn run_ndjson_file_with_report<P, W>(
449        &self,
450        path: P,
451        query: &str,
452        writer: W,
453    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
454    where
455        P: AsRef<std::path::Path>,
456        W: std::io::Write,
457    {
458        io::run_ndjson_file_with_report(self, path, query, writer)
459    }
460
461    /// Like [`JetroEngine::run_ndjson_file_with_report`] with explicit options.
462    pub fn run_ndjson_file_with_report_and_options<P, W>(
463        &self,
464        path: P,
465        query: &str,
466        writer: W,
467        options: io::NdjsonOptions,
468    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
469    where
470        P: AsRef<std::path::Path>,
471        W: std::io::Write,
472    {
473        io::run_ndjson_file_with_report_and_options(self, path, query, writer, options)
474    }
475
476    /// Open an NDJSON file, write at most `limit` query results, and stop reading.
477    pub fn run_ndjson_file_limit<P, W>(
478        &self,
479        path: P,
480        query: &str,
481        limit: usize,
482        writer: W,
483    ) -> std::result::Result<usize, JetroEngineError>
484    where
485        P: AsRef<std::path::Path>,
486        W: std::io::Write,
487    {
488        io::run_ndjson_file_limit(self, path, query, limit, writer)
489    }
490
491    /// Like [`JetroEngine::run_ndjson_file_limit`] with explicit NDJSON reader options.
492    pub fn run_ndjson_file_limit_with_options<P, W>(
493        &self,
494        path: P,
495        query: &str,
496        limit: usize,
497        writer: W,
498        options: io::NdjsonOptions,
499    ) -> std::result::Result<usize, JetroEngineError>
500    where
501        P: AsRef<std::path::Path>,
502        W: std::io::Write,
503    {
504        io::run_ndjson_file_limit_with_options(self, path, query, limit, writer, options)
505    }
506
507    /// Evaluate a limited file-backed NDJSON query and return a route/counter report.
508    pub fn run_ndjson_file_limit_with_report<P, W>(
509        &self,
510        path: P,
511        query: &str,
512        limit: usize,
513        writer: W,
514    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
515    where
516        P: AsRef<std::path::Path>,
517        W: std::io::Write,
518    {
519        io::run_ndjson_file_limit_with_report(self, path, query, limit, writer)
520    }
521
522    /// Like [`JetroEngine::run_ndjson_file_limit_with_report`] with explicit options.
523    pub fn run_ndjson_file_limit_with_report_and_options<P, W>(
524        &self,
525        path: P,
526        query: &str,
527        limit: usize,
528        writer: W,
529        options: io::NdjsonOptions,
530    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
531    where
532        P: AsRef<std::path::Path>,
533        W: std::io::Write,
534    {
535        io::run_ndjson_file_limit_with_report_and_options(self, path, query, limit, writer, options)
536    }
537
538    /// Evaluate `query` independently for every row from an [`io::NdjsonSource`].
539    pub fn run_ndjson_source<W>(
540        &self,
541        source: io::NdjsonSource,
542        query: &str,
543        writer: W,
544    ) -> std::result::Result<usize, JetroEngineError>
545    where
546        W: std::io::Write,
547    {
548        io::run_ndjson_source(self, source, query, writer)
549    }
550
551    /// Like [`JetroEngine::run_ndjson_source`] with explicit NDJSON reader options.
552    pub fn run_ndjson_source_with_options<W>(
553        &self,
554        source: io::NdjsonSource,
555        query: &str,
556        writer: W,
557        options: io::NdjsonOptions,
558    ) -> std::result::Result<usize, JetroEngineError>
559    where
560        W: std::io::Write,
561    {
562        io::run_ndjson_source_with_options(self, source, query, writer, options)
563    }
564
565    /// Evaluate an [`io::NdjsonSource`] query and return a route/counter report.
566    pub fn run_ndjson_source_with_report<W>(
567        &self,
568        source: io::NdjsonSource,
569        query: &str,
570        writer: W,
571    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
572    where
573        W: std::io::Write,
574    {
575        io::run_ndjson_source_with_report(self, source, query, writer)
576    }
577
578    /// Like [`JetroEngine::run_ndjson_source_with_report`] with explicit options.
579    pub fn run_ndjson_source_with_report_and_options<W>(
580        &self,
581        source: io::NdjsonSource,
582        query: &str,
583        writer: W,
584        options: io::NdjsonOptions,
585    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
586    where
587        W: std::io::Write,
588    {
589        io::run_ndjson_source_with_report_and_options(self, source, query, writer, options)
590    }
591
592    /// Evaluate `query` for rows from an [`io::NdjsonSource`], write at most
593    /// `limit` results, and stop reading.
594    pub fn run_ndjson_source_limit<W>(
595        &self,
596        source: io::NdjsonSource,
597        query: &str,
598        limit: usize,
599        writer: W,
600    ) -> std::result::Result<usize, JetroEngineError>
601    where
602        W: std::io::Write,
603    {
604        io::run_ndjson_source_limit(self, source, query, limit, writer)
605    }
606
607    /// Like [`JetroEngine::run_ndjson_source_limit`] with explicit NDJSON reader options.
608    pub fn run_ndjson_source_limit_with_options<W>(
609        &self,
610        source: io::NdjsonSource,
611        query: &str,
612        limit: usize,
613        writer: W,
614        options: io::NdjsonOptions,
615    ) -> std::result::Result<usize, JetroEngineError>
616    where
617        W: std::io::Write,
618    {
619        io::run_ndjson_source_limit_with_options(self, source, query, limit, writer, options)
620    }
621
622    /// Evaluate a limited [`io::NdjsonSource`] query and return a route/counter report.
623    pub fn run_ndjson_source_limit_with_report<W>(
624        &self,
625        source: io::NdjsonSource,
626        query: &str,
627        limit: usize,
628        writer: W,
629    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
630    where
631        W: std::io::Write,
632    {
633        io::run_ndjson_source_limit_with_report(self, source, query, limit, writer)
634    }
635
636    /// Like [`JetroEngine::run_ndjson_source_limit_with_report`] with explicit options.
637    pub fn run_ndjson_source_limit_with_report_and_options<W>(
638        &self,
639        source: io::NdjsonSource,
640        query: &str,
641        limit: usize,
642        writer: W,
643        options: io::NdjsonOptions,
644    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
645    where
646        W: std::io::Write,
647    {
648        io::run_ndjson_source_limit_with_report_and_options(
649            self, source, query, limit, writer, options,
650        )
651    }
652
653    /// Read an NDJSON file from tail to head and write one query result per row.
654    pub fn run_ndjson_rev<P, W>(
655        &self,
656        path: P,
657        query: &str,
658        writer: W,
659    ) -> std::result::Result<usize, JetroEngineError>
660    where
661        P: AsRef<std::path::Path>,
662        W: std::io::Write,
663    {
664        io::run_ndjson_rev(self, path, query, writer)
665    }
666
667    /// Like [`JetroEngine::run_ndjson_rev`] with explicit NDJSON reader options.
668    pub fn run_ndjson_rev_with_options<P, W>(
669        &self,
670        path: P,
671        query: &str,
672        writer: W,
673        options: io::NdjsonOptions,
674    ) -> std::result::Result<usize, JetroEngineError>
675    where
676        P: AsRef<std::path::Path>,
677        W: std::io::Write,
678    {
679        io::run_ndjson_rev_with_options(self, path, query, writer, options)
680    }
681
682    /// Read an NDJSON file from tail to head, write at most `limit` query
683    /// results, and stop reading.
684    pub fn run_ndjson_rev_limit<P, W>(
685        &self,
686        path: P,
687        query: &str,
688        limit: usize,
689        writer: W,
690    ) -> std::result::Result<usize, JetroEngineError>
691    where
692        P: AsRef<std::path::Path>,
693        W: std::io::Write,
694    {
695        io::run_ndjson_rev_limit(self, path, query, limit, writer)
696    }
697
698    /// Like [`JetroEngine::run_ndjson_rev_limit`] with explicit NDJSON reader options.
699    pub fn run_ndjson_rev_limit_with_options<P, W>(
700        &self,
701        path: P,
702        query: &str,
703        limit: usize,
704        writer: W,
705        options: io::NdjsonOptions,
706    ) -> std::result::Result<usize, JetroEngineError>
707    where
708        P: AsRef<std::path::Path>,
709        W: std::io::Write,
710    {
711        io::run_ndjson_rev_limit_with_options(self, path, query, limit, writer, options)
712    }
713
714    /// Read an NDJSON file from tail to head, keep only the first row seen for
715    /// each `key_query` result in that reverse stream order, write `query` for
716    /// retained rows, and stop after `limit` retained rows.
717    pub fn run_ndjson_rev_distinct_by<P, W>(
718        &self,
719        path: P,
720        key_query: &str,
721        query: &str,
722        limit: usize,
723        writer: W,
724    ) -> std::result::Result<usize, JetroEngineError>
725    where
726        P: AsRef<std::path::Path>,
727        W: std::io::Write,
728    {
729        io::run_ndjson_rev_distinct_by(self, path, key_query, query, limit, writer)
730    }
731
732    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`] with explicit NDJSON
733    /// reader options.
734    pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
735        &self,
736        path: P,
737        key_query: &str,
738        query: &str,
739        limit: usize,
740        writer: W,
741        options: io::NdjsonOptions,
742    ) -> std::result::Result<usize, JetroEngineError>
743    where
744        P: AsRef<std::path::Path>,
745        W: std::io::Write,
746    {
747        io::run_ndjson_rev_distinct_by_with_options(
748            self, path, key_query, query, limit, writer, options,
749        )
750    }
751
752    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`], returning execution
753    /// counters for path-selection and duplicate-drop observability.
754    pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
755        &self,
756        path: P,
757        key_query: &str,
758        query: &str,
759        limit: usize,
760        writer: W,
761    ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
762    where
763        P: AsRef<std::path::Path>,
764        W: std::io::Write,
765    {
766        io::run_ndjson_rev_distinct_by_with_stats(self, path, key_query, query, limit, writer)
767    }
768
769    /// Like [`JetroEngine::run_ndjson_rev_distinct_by_with_stats`] with explicit
770    /// NDJSON reader options.
771    pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
772        &self,
773        path: P,
774        key_query: &str,
775        query: &str,
776        limit: usize,
777        writer: W,
778        options: io::NdjsonOptions,
779    ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
780    where
781        P: AsRef<std::path::Path>,
782        W: std::io::Write,
783    {
784        io::run_ndjson_rev_distinct_by_with_stats_and_options(
785            self, path, key_query, query, limit, writer, options,
786        )
787    }
788
789    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`], returning the shared
790    /// NDJSON execution report shape.
791    pub fn run_ndjson_rev_distinct_by_with_report<P, W>(
792        &self,
793        path: P,
794        key_query: &str,
795        query: &str,
796        limit: usize,
797        writer: W,
798    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
799    where
800        P: AsRef<std::path::Path>,
801        W: std::io::Write,
802    {
803        io::run_ndjson_rev_distinct_by_with_report(self, path, key_query, query, limit, writer)
804    }
805
806    /// Like [`JetroEngine::run_ndjson_rev_distinct_by_with_report`] with explicit options.
807    pub fn run_ndjson_rev_distinct_by_with_report_and_options<P, W>(
808        &self,
809        path: P,
810        key_query: &str,
811        query: &str,
812        limit: usize,
813        writer: W,
814        options: io::NdjsonOptions,
815    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
816    where
817        P: AsRef<std::path::Path>,
818        W: std::io::Write,
819    {
820        io::run_ndjson_rev_distinct_by_with_report_and_options(
821            self, path, key_query, query, limit, writer, options,
822        )
823    }
824
825    /// Like [`JetroEngine::run_ndjson`] with explicit NDJSON reader options.
826    pub fn run_ndjson_with_options<R, W>(
827        &self,
828        reader: R,
829        query: &str,
830        writer: W,
831        options: io::NdjsonOptions,
832    ) -> std::result::Result<usize, JetroEngineError>
833    where
834        R: std::io::BufRead,
835        W: std::io::Write,
836    {
837        io::run_ndjson_with_options(self, reader, query, writer, options)
838    }
839
840    /// Evaluate `query` for NDJSON rows and return a route/counter report.
841    pub fn run_ndjson_with_report<R, W>(
842        &self,
843        reader: R,
844        query: &str,
845        writer: W,
846    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
847    where
848        R: std::io::BufRead,
849        W: std::io::Write,
850    {
851        io::run_ndjson_with_report(self, reader, query, writer)
852    }
853
854    /// Like [`JetroEngine::run_ndjson_with_report`] with explicit NDJSON reader options.
855    pub fn run_ndjson_with_report_and_options<R, W>(
856        &self,
857        reader: R,
858        query: &str,
859        writer: W,
860        options: io::NdjsonOptions,
861    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
862    where
863        R: std::io::BufRead,
864        W: std::io::Write,
865    {
866        io::run_ndjson_with_report_and_options(self, reader, query, writer, options)
867    }
868
869    /// Evaluate `query` for NDJSON rows, write at most `limit` results, and stop reading.
870    pub fn run_ndjson_limit<R, W>(
871        &self,
872        reader: R,
873        query: &str,
874        limit: usize,
875        writer: W,
876    ) -> std::result::Result<usize, JetroEngineError>
877    where
878        R: std::io::BufRead,
879        W: std::io::Write,
880    {
881        io::run_ndjson_limit(self, reader, query, limit, writer)
882    }
883
884    /// Like [`JetroEngine::run_ndjson_limit`] with explicit NDJSON reader options.
885    pub fn run_ndjson_limit_with_options<R, W>(
886        &self,
887        reader: R,
888        query: &str,
889        limit: usize,
890        writer: W,
891        options: io::NdjsonOptions,
892    ) -> std::result::Result<usize, JetroEngineError>
893    where
894        R: std::io::BufRead,
895        W: std::io::Write,
896    {
897        io::run_ndjson_limit_with_options(self, reader, query, limit, writer, options)
898    }
899
900    /// Evaluate a limited NDJSON reader query and return a route/counter report.
901    pub fn run_ndjson_limit_with_report<R, W>(
902        &self,
903        reader: R,
904        query: &str,
905        limit: usize,
906        writer: W,
907    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
908    where
909        R: std::io::BufRead,
910        W: std::io::Write,
911    {
912        io::run_ndjson_limit_with_report(self, reader, query, limit, writer)
913    }
914
915    /// Like [`JetroEngine::run_ndjson_limit_with_report`] with explicit options.
916    pub fn run_ndjson_limit_with_report_and_options<R, W>(
917        &self,
918        reader: R,
919        query: &str,
920        limit: usize,
921        writer: W,
922        options: io::NdjsonOptions,
923    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
924    where
925        R: std::io::BufRead,
926        W: std::io::Write,
927    {
928        io::run_ndjson_limit_with_report_and_options(self, reader, query, limit, writer, options)
929    }
930
931    /// Evaluate `predicate` for each NDJSON row, write matching original rows,
932    /// and stop after `limit` matches.
933    pub fn run_ndjson_matches<R, W>(
934        &self,
935        reader: R,
936        predicate: &str,
937        limit: usize,
938        writer: W,
939    ) -> std::result::Result<usize, JetroEngineError>
940    where
941        R: std::io::BufRead,
942        W: std::io::Write,
943    {
944        io::run_ndjson_matches(self, reader, predicate, limit, writer)
945    }
946
947    /// Like [`JetroEngine::run_ndjson_matches`] with explicit NDJSON reader options.
948    pub fn run_ndjson_matches_with_options<R, W>(
949        &self,
950        reader: R,
951        predicate: &str,
952        limit: usize,
953        writer: W,
954        options: io::NdjsonOptions,
955    ) -> std::result::Result<usize, JetroEngineError>
956    where
957        R: std::io::BufRead,
958        W: std::io::Write,
959    {
960        io::run_ndjson_matches_with_options(self, reader, predicate, limit, writer, options)
961    }
962
963    /// Evaluate a match-limited NDJSON query and return the shared execution report.
964    pub fn run_ndjson_matches_with_report<R, W>(
965        &self,
966        reader: R,
967        predicate: &str,
968        limit: usize,
969        writer: W,
970    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
971    where
972        R: std::io::BufRead,
973        W: std::io::Write,
974    {
975        io::run_ndjson_matches_with_report(self, reader, predicate, limit, writer)
976    }
977
978    /// Like [`JetroEngine::run_ndjson_matches_with_report`] with explicit options.
979    pub fn run_ndjson_matches_with_report_and_options<R, W>(
980        &self,
981        reader: R,
982        predicate: &str,
983        limit: usize,
984        writer: W,
985        options: io::NdjsonOptions,
986    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
987    where
988        R: std::io::BufRead,
989        W: std::io::Write,
990    {
991        io::run_ndjson_matches_with_report_and_options(
992            self, reader, predicate, limit, writer, options,
993        )
994    }
995
996    /// Open an NDJSON file, write matching original rows, and stop after `limit` matches.
997    pub fn run_ndjson_matches_file<P, W>(
998        &self,
999        path: P,
1000        predicate: &str,
1001        limit: usize,
1002        writer: W,
1003    ) -> std::result::Result<usize, JetroEngineError>
1004    where
1005        P: AsRef<std::path::Path>,
1006        W: std::io::Write,
1007    {
1008        io::run_ndjson_matches_file(self, path, predicate, limit, writer)
1009    }
1010
1011    /// Like [`JetroEngine::run_ndjson_matches_file`] with explicit NDJSON reader options.
1012    pub fn run_ndjson_matches_file_with_options<P, W>(
1013        &self,
1014        path: P,
1015        predicate: &str,
1016        limit: usize,
1017        writer: W,
1018        options: io::NdjsonOptions,
1019    ) -> std::result::Result<usize, JetroEngineError>
1020    where
1021        P: AsRef<std::path::Path>,
1022        W: std::io::Write,
1023    {
1024        io::run_ndjson_matches_file_with_options(self, path, predicate, limit, writer, options)
1025    }
1026
1027    /// Like [`JetroEngine::run_ndjson_matches_file`], returning the shared report.
1028    pub fn run_ndjson_matches_file_with_report<P, W>(
1029        &self,
1030        path: P,
1031        predicate: &str,
1032        limit: usize,
1033        writer: W,
1034    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1035    where
1036        P: AsRef<std::path::Path>,
1037        W: std::io::Write,
1038    {
1039        io::run_ndjson_matches_file_with_report(self, path, predicate, limit, writer)
1040    }
1041
1042    /// Like [`JetroEngine::run_ndjson_matches_file_with_report`] with explicit options.
1043    pub fn run_ndjson_matches_file_with_report_and_options<P, W>(
1044        &self,
1045        path: P,
1046        predicate: &str,
1047        limit: usize,
1048        writer: W,
1049        options: io::NdjsonOptions,
1050    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1051    where
1052        P: AsRef<std::path::Path>,
1053        W: std::io::Write,
1054    {
1055        io::run_ndjson_matches_file_with_report_and_options(
1056            self, path, predicate, limit, writer, options,
1057        )
1058    }
1059
1060    /// Evaluate `predicate` against each row from an [`io::NdjsonSource`], write
1061    /// matching original rows, and stop after `limit` matches.
1062    pub fn run_ndjson_matches_source<W>(
1063        &self,
1064        source: io::NdjsonSource,
1065        predicate: &str,
1066        limit: usize,
1067        writer: W,
1068    ) -> std::result::Result<usize, JetroEngineError>
1069    where
1070        W: std::io::Write,
1071    {
1072        io::run_ndjson_matches_source(self, source, predicate, limit, writer)
1073    }
1074
1075    /// Like [`JetroEngine::run_ndjson_matches_source`] with explicit NDJSON reader options.
1076    pub fn run_ndjson_matches_source_with_options<W>(
1077        &self,
1078        source: io::NdjsonSource,
1079        predicate: &str,
1080        limit: usize,
1081        writer: W,
1082        options: io::NdjsonOptions,
1083    ) -> std::result::Result<usize, JetroEngineError>
1084    where
1085        W: std::io::Write,
1086    {
1087        io::run_ndjson_matches_source_with_options(self, source, predicate, limit, writer, options)
1088    }
1089
1090    /// Like [`JetroEngine::run_ndjson_matches_source`], returning the shared report.
1091    pub fn run_ndjson_matches_source_with_report<W>(
1092        &self,
1093        source: io::NdjsonSource,
1094        predicate: &str,
1095        limit: usize,
1096        writer: W,
1097    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1098    where
1099        W: std::io::Write,
1100    {
1101        io::run_ndjson_matches_source_with_report(self, source, predicate, limit, writer)
1102    }
1103
1104    /// Like [`JetroEngine::run_ndjson_matches_source_with_report`] with explicit options.
1105    pub fn run_ndjson_matches_source_with_report_and_options<W>(
1106        &self,
1107        source: io::NdjsonSource,
1108        predicate: &str,
1109        limit: usize,
1110        writer: W,
1111        options: io::NdjsonOptions,
1112    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1113    where
1114        W: std::io::Write,
1115    {
1116        io::run_ndjson_matches_source_with_report_and_options(
1117            self, source, predicate, limit, writer, options,
1118        )
1119    }
1120
1121    /// Read an NDJSON file from tail to head, write matching original rows, and
1122    /// stop after `limit` matches.
1123    pub fn run_ndjson_rev_matches<P, W>(
1124        &self,
1125        path: P,
1126        predicate: &str,
1127        limit: usize,
1128        writer: W,
1129    ) -> std::result::Result<usize, JetroEngineError>
1130    where
1131        P: AsRef<std::path::Path>,
1132        W: std::io::Write,
1133    {
1134        io::run_ndjson_rev_matches(self, path, predicate, limit, writer)
1135    }
1136
1137    /// Like [`JetroEngine::run_ndjson_rev_matches`] with explicit NDJSON reader options.
1138    pub fn run_ndjson_rev_matches_with_options<P, W>(
1139        &self,
1140        path: P,
1141        predicate: &str,
1142        limit: usize,
1143        writer: W,
1144        options: io::NdjsonOptions,
1145    ) -> std::result::Result<usize, JetroEngineError>
1146    where
1147        P: AsRef<std::path::Path>,
1148        W: std::io::Write,
1149    {
1150        io::run_ndjson_rev_matches_with_options(self, path, predicate, limit, writer, options)
1151    }
1152
1153    /// Like [`JetroEngine::run_ndjson_rev_matches`], returning the shared report.
1154    pub fn run_ndjson_rev_matches_with_report<P, W>(
1155        &self,
1156        path: P,
1157        predicate: &str,
1158        limit: usize,
1159        writer: W,
1160    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1161    where
1162        P: AsRef<std::path::Path>,
1163        W: std::io::Write,
1164    {
1165        io::run_ndjson_rev_matches_with_report(self, path, predicate, limit, writer)
1166    }
1167
1168    /// Like [`JetroEngine::run_ndjson_rev_matches_with_report`] with explicit options.
1169    pub fn run_ndjson_rev_matches_with_report_and_options<P, W>(
1170        &self,
1171        path: P,
1172        predicate: &str,
1173        limit: usize,
1174        writer: W,
1175        options: io::NdjsonOptions,
1176    ) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
1177    where
1178        P: AsRef<std::path::Path>,
1179        W: std::io::Write,
1180    {
1181        io::run_ndjson_rev_matches_with_report_and_options(
1182            self, path, predicate, limit, writer, options,
1183        )
1184    }
1185
1186    /// Evaluate `query` independently for every non-empty NDJSON row and collect
1187    /// the per-row results.
1188    pub fn collect_ndjson<R>(
1189        &self,
1190        reader: R,
1191        query: &str,
1192    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1193    where
1194        R: std::io::BufRead,
1195    {
1196        io::collect_ndjson(self, reader, query)
1197    }
1198
1199    /// Open an NDJSON file and collect per-row query results.
1200    pub fn collect_ndjson_file<P>(
1201        &self,
1202        path: P,
1203        query: &str,
1204    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1205    where
1206        P: AsRef<std::path::Path>,
1207    {
1208        io::collect_ndjson_file(self, path, query)
1209    }
1210
1211    /// Like [`JetroEngine::collect_ndjson_file`] with explicit NDJSON reader options.
1212    pub fn collect_ndjson_file_with_options<P>(
1213        &self,
1214        path: P,
1215        query: &str,
1216        options: io::NdjsonOptions,
1217    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1218    where
1219        P: AsRef<std::path::Path>,
1220    {
1221        io::collect_ndjson_file_with_options(self, path, query, options)
1222    }
1223
1224    /// Collect per-row query results from an [`io::NdjsonSource`].
1225    pub fn collect_ndjson_source(
1226        &self,
1227        source: io::NdjsonSource,
1228        query: &str,
1229    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1230        io::collect_ndjson_source(self, source, query)
1231    }
1232
1233    /// Like [`JetroEngine::collect_ndjson_source`] with explicit NDJSON reader options.
1234    pub fn collect_ndjson_source_with_options(
1235        &self,
1236        source: io::NdjsonSource,
1237        query: &str,
1238        options: io::NdjsonOptions,
1239    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1240        io::collect_ndjson_source_with_options(self, source, query, options)
1241    }
1242
1243    /// Read an NDJSON file from tail to head and collect per-row query results.
1244    pub fn collect_ndjson_rev<P>(
1245        &self,
1246        path: P,
1247        query: &str,
1248    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1249    where
1250        P: AsRef<std::path::Path>,
1251    {
1252        io::collect_ndjson_rev(self, path, query)
1253    }
1254
1255    /// Like [`JetroEngine::collect_ndjson_rev`] with explicit NDJSON reader options.
1256    pub fn collect_ndjson_rev_with_options<P>(
1257        &self,
1258        path: P,
1259        query: &str,
1260        options: io::NdjsonOptions,
1261    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1262    where
1263        P: AsRef<std::path::Path>,
1264    {
1265        io::collect_ndjson_rev_with_options(self, path, query, options)
1266    }
1267
1268    /// Read an NDJSON file from tail to head and call `f` with each query result
1269    /// as it is produced.
1270    pub fn for_each_ndjson_rev<P, F>(
1271        &self,
1272        path: P,
1273        query: &str,
1274        f: F,
1275    ) -> std::result::Result<usize, JetroEngineError>
1276    where
1277        P: AsRef<std::path::Path>,
1278        F: FnMut(Value),
1279    {
1280        io::for_each_ndjson_rev(self, path, query, f)
1281    }
1282
1283    /// Read an NDJSON file from tail to head and call `f` until it returns
1284    /// [`io::NdjsonControl::Stop`] or input is exhausted.
1285    pub fn for_each_ndjson_rev_until<P, F>(
1286        &self,
1287        path: P,
1288        query: &str,
1289        f: F,
1290    ) -> std::result::Result<usize, JetroEngineError>
1291    where
1292        P: AsRef<std::path::Path>,
1293        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1294    {
1295        io::for_each_ndjson_rev_with_options(self, path, query, io::NdjsonOptions::default(), f)
1296    }
1297
1298    /// Like [`JetroEngine::for_each_ndjson_rev_until`] with explicit NDJSON reader options.
1299    pub fn for_each_ndjson_rev_until_with_options<P, F>(
1300        &self,
1301        path: P,
1302        query: &str,
1303        options: io::NdjsonOptions,
1304        f: F,
1305    ) -> std::result::Result<usize, JetroEngineError>
1306    where
1307        P: AsRef<std::path::Path>,
1308        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1309    {
1310        io::for_each_ndjson_rev_with_options(self, path, query, options, f)
1311    }
1312
1313    /// Like [`JetroEngine::for_each_ndjson_rev`] with explicit NDJSON reader options.
1314    pub fn for_each_ndjson_rev_with_options<P, F>(
1315        &self,
1316        path: P,
1317        query: &str,
1318        options: io::NdjsonOptions,
1319        mut f: F,
1320    ) -> std::result::Result<usize, JetroEngineError>
1321    where
1322        P: AsRef<std::path::Path>,
1323        F: FnMut(Value),
1324    {
1325        io::for_each_ndjson_rev_with_options(self, path, query, options, |value| {
1326            f(value);
1327            Ok(io::NdjsonControl::Continue)
1328        })
1329    }
1330
1331    /// Like [`JetroEngine::collect_ndjson`] with explicit NDJSON reader options.
1332    pub fn collect_ndjson_with_options<R>(
1333        &self,
1334        reader: R,
1335        query: &str,
1336        options: io::NdjsonOptions,
1337    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1338    where
1339        R: std::io::BufRead,
1340    {
1341        io::collect_ndjson_with_options(self, reader, query, options)
1342    }
1343
1344    /// Evaluate `predicate` for each NDJSON row, collect matching original
1345    /// rows, and stop after `limit` matches.
1346    pub fn collect_ndjson_matches<R>(
1347        &self,
1348        reader: R,
1349        predicate: &str,
1350        limit: usize,
1351    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1352    where
1353        R: std::io::BufRead,
1354    {
1355        io::collect_ndjson_matches(self, reader, predicate, limit)
1356    }
1357
1358    /// Like [`JetroEngine::collect_ndjson_matches`] with explicit NDJSON reader options.
1359    pub fn collect_ndjson_matches_with_options<R>(
1360        &self,
1361        reader: R,
1362        predicate: &str,
1363        limit: usize,
1364        options: io::NdjsonOptions,
1365    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1366    where
1367        R: std::io::BufRead,
1368    {
1369        io::collect_ndjson_matches_with_options(self, reader, predicate, limit, options)
1370    }
1371
1372    /// Open an NDJSON file, collect matching original rows, and stop after `limit` matches.
1373    pub fn collect_ndjson_matches_file<P>(
1374        &self,
1375        path: P,
1376        predicate: &str,
1377        limit: usize,
1378    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1379    where
1380        P: AsRef<std::path::Path>,
1381    {
1382        io::collect_ndjson_matches_file(self, path, predicate, limit)
1383    }
1384
1385    /// Like [`JetroEngine::collect_ndjson_matches_file`] with explicit NDJSON reader options.
1386    pub fn collect_ndjson_matches_file_with_options<P>(
1387        &self,
1388        path: P,
1389        predicate: &str,
1390        limit: usize,
1391        options: io::NdjsonOptions,
1392    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1393    where
1394        P: AsRef<std::path::Path>,
1395    {
1396        io::collect_ndjson_matches_file_with_options(self, path, predicate, limit, options)
1397    }
1398
1399    /// Evaluate `predicate` against each row from an [`io::NdjsonSource`],
1400    /// collect matching original rows, and stop after `limit` matches.
1401    pub fn collect_ndjson_matches_source(
1402        &self,
1403        source: io::NdjsonSource,
1404        predicate: &str,
1405        limit: usize,
1406    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1407        io::collect_ndjson_matches_source(self, source, predicate, limit)
1408    }
1409
1410    /// Like [`JetroEngine::collect_ndjson_matches_source`] with explicit NDJSON reader options.
1411    pub fn collect_ndjson_matches_source_with_options(
1412        &self,
1413        source: io::NdjsonSource,
1414        predicate: &str,
1415        limit: usize,
1416        options: io::NdjsonOptions,
1417    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1418        io::collect_ndjson_matches_source_with_options(self, source, predicate, limit, options)
1419    }
1420
1421    /// Read an NDJSON file from tail to head, collect matching original rows,
1422    /// and stop after `limit` matches.
1423    pub fn collect_ndjson_rev_matches<P>(
1424        &self,
1425        path: P,
1426        predicate: &str,
1427        limit: usize,
1428    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1429    where
1430        P: AsRef<std::path::Path>,
1431    {
1432        io::collect_ndjson_rev_matches(self, path, predicate, limit)
1433    }
1434
1435    /// Like [`JetroEngine::collect_ndjson_rev_matches`] with explicit NDJSON reader options.
1436    pub fn collect_ndjson_rev_matches_with_options<P>(
1437        &self,
1438        path: P,
1439        predicate: &str,
1440        limit: usize,
1441        options: io::NdjsonOptions,
1442    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1443    where
1444        P: AsRef<std::path::Path>,
1445    {
1446        io::collect_ndjson_rev_matches_with_options(self, path, predicate, limit, options)
1447    }
1448
1449    /// Evaluate `query` independently for every non-empty NDJSON row and call
1450    /// `f` with each result as it is produced.
1451    pub fn for_each_ndjson<R, F>(
1452        &self,
1453        reader: R,
1454        query: &str,
1455        f: F,
1456    ) -> std::result::Result<usize, JetroEngineError>
1457    where
1458        R: std::io::BufRead,
1459        F: FnMut(Value),
1460    {
1461        io::for_each_ndjson(self, reader, query, f)
1462    }
1463
1464    /// Evaluate `query` independently for every non-empty NDJSON row and call
1465    /// `f` until it returns [`io::NdjsonControl::Stop`] or input is exhausted.
1466    pub fn for_each_ndjson_until<R, F>(
1467        &self,
1468        reader: R,
1469        query: &str,
1470        f: F,
1471    ) -> std::result::Result<usize, JetroEngineError>
1472    where
1473        R: std::io::BufRead,
1474        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1475    {
1476        io::for_each_ndjson_until(self, reader, query, f)
1477    }
1478
1479    /// Evaluate `query` for every row from an [`io::NdjsonSource`] and call
1480    /// `f` with each result as it is produced.
1481    pub fn for_each_ndjson_source<F>(
1482        &self,
1483        source: io::NdjsonSource,
1484        query: &str,
1485        f: F,
1486    ) -> std::result::Result<usize, JetroEngineError>
1487    where
1488        F: FnMut(Value),
1489    {
1490        io::for_each_ndjson_source(self, source, query, f)
1491    }
1492
1493    /// Evaluate `query` for every row from an [`io::NdjsonSource`] and call
1494    /// `f` until it returns [`io::NdjsonControl::Stop`] or input is exhausted.
1495    pub fn for_each_ndjson_source_until<F>(
1496        &self,
1497        source: io::NdjsonSource,
1498        query: &str,
1499        f: F,
1500    ) -> std::result::Result<usize, JetroEngineError>
1501    where
1502        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1503    {
1504        io::for_each_ndjson_source_until(self, source, query, f)
1505    }
1506
1507    /// Like [`JetroEngine::for_each_ndjson_source_until`] with explicit NDJSON reader options.
1508    pub fn for_each_ndjson_source_until_with_options<F>(
1509        &self,
1510        source: io::NdjsonSource,
1511        query: &str,
1512        options: io::NdjsonOptions,
1513        f: F,
1514    ) -> std::result::Result<usize, JetroEngineError>
1515    where
1516        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1517    {
1518        io::for_each_ndjson_source_until_with_options(self, source, query, options, f)
1519    }
1520
1521    /// Like [`JetroEngine::for_each_ndjson_source`] with explicit NDJSON reader options.
1522    pub fn for_each_ndjson_source_with_options<F>(
1523        &self,
1524        source: io::NdjsonSource,
1525        query: &str,
1526        options: io::NdjsonOptions,
1527        f: F,
1528    ) -> std::result::Result<usize, JetroEngineError>
1529    where
1530        F: FnMut(Value),
1531    {
1532        io::for_each_ndjson_source_with_options(self, source, query, options, f)
1533    }
1534
1535    /// Like [`JetroEngine::for_each_ndjson`] with explicit NDJSON reader options.
1536    pub fn for_each_ndjson_with_options<R, F>(
1537        &self,
1538        reader: R,
1539        query: &str,
1540        options: io::NdjsonOptions,
1541        f: F,
1542    ) -> std::result::Result<usize, JetroEngineError>
1543    where
1544        R: std::io::BufRead,
1545        F: FnMut(Value),
1546    {
1547        io::for_each_ndjson_with_options(self, reader, query, options, f)
1548    }
1549
1550    /// Like [`JetroEngine::for_each_ndjson_until`] with explicit NDJSON reader options.
1551    pub fn for_each_ndjson_until_with_options<R, F>(
1552        &self,
1553        reader: R,
1554        query: &str,
1555        options: io::NdjsonOptions,
1556        f: F,
1557    ) -> std::result::Result<usize, JetroEngineError>
1558    where
1559        R: std::io::BufRead,
1560        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1561    {
1562        io::for_each_ndjson_until_with_options(self, reader, query, options, f)
1563    }
1564
1565    /// Look up a compiled `QueryPlan` by expression string and planning context,
1566    /// compiling and inserting it if not already cached; evicts the whole cache if full.
1567    pub(crate) fn cached_plan(
1568        &self,
1569        expr: &str,
1570        context: plan::physical::PlanningContext,
1571    ) -> ir::physical::QueryPlan {
1572        let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
1573        let cache_key = format!("{}\0{}", context.cache_key(), expr);
1574        if let Some(plan) = cache.get(&cache_key) {
1575            return plan.clone();
1576        }
1577
1578        let plan = plan::physical::plan_query_with_context(expr, context);
1579        if self.plan_cache_limit > 0 {
1580            if cache.len() >= self.plan_cache_limit {
1581                cache.clear();
1582            }
1583            cache.insert(cache_key, plan.clone());
1584        }
1585        plan
1586    }
1587}
1588
1589impl exec::pipeline::PipelineData for Jetro {
1590    fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
1591        self.get_or_promote_objvec(arr)
1592    }
1593}
1594
1595impl Jetro {
1596    /// Return a reference to the lazily parsed simd-json `TapeData`, parsing raw bytes
1597    /// on first access. Returns `Ok(None)` when no raw bytes are stored.
1598    pub(crate) fn lazy_tape(
1599        &self,
1600    ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
1601        if let Some(result) = self.tape.get() {
1602            return result
1603                .as_ref()
1604                .map(Some)
1605                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1606        }
1607        let Some(raw) = self.raw_bytes.as_ref() else {
1608            return Ok(None);
1609        };
1610        let bytes: Vec<u8> = (**raw).to_vec();
1611        let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
1612        let _ = self.tape.set(parsed);
1613        self.tape
1614            .get()
1615            .expect("tape cache initialized")
1616            .as_ref()
1617            .map(Some)
1618            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1619    }
1620
1621    /// Look up or build an `ObjVecData` columnar representation for the given
1622    /// `Arc<Vec<Val>>` array, caching the result by pointer address.
1623    pub(crate) fn get_or_promote_objvec(
1624        &self,
1625        arr: &Arc<Vec<Val>>,
1626    ) -> Option<Arc<crate::data::value::ObjVecData>> {
1627        let key = Arc::as_ptr(arr) as usize;
1628        if let Ok(cache) = self.objvec_cache.lock() {
1629            if let Some(d) = cache.get(&key) {
1630                return Some(Arc::clone(d));
1631            }
1632        }
1633        let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
1634        if let Ok(mut cache) = self.objvec_cache.lock() {
1635            cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
1636        }
1637        Some(promoted)
1638    }
1639
1640    /// Internal constructor that wraps a `serde_json::Value` without raw bytes.
1641    pub(crate) fn new(document: Value) -> Self {
1642        Self {
1643            document,
1644            root_val: OnceCell::new(),
1645            objvec_cache: Default::default(),
1646            raw_bytes: None,
1647            tape: OnceCell::new(),
1648            structural_index: OnceCell::new(),
1649            vm: RefCell::new(VM::new()),
1650        }
1651    }
1652
1653    /// Build a `Jetro` whose `root_val` is pre-cached with `root` (constructed by the
1654    /// caller, typically via [`Val::from_value_with`] using an engine-owned key cache).
1655    /// `document` is retained for value-backed callers and tests that read the
1656    /// original `serde_json::Value`.
1657    pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
1658        let root_val = OnceCell::new();
1659        let _ = root_val.set(root);
1660        Self {
1661            document,
1662            root_val,
1663            objvec_cache: Default::default(),
1664            raw_bytes: None,
1665            tape: OnceCell::new(),
1666            structural_index: OnceCell::new(),
1667            vm: RefCell::new(VM::new()),
1668        }
1669    }
1670
1671    /// Like [`Jetro::root_val`] but interns object keys through `keys` instead of the
1672    /// process-wide default. Used by [`JetroEngine::parse_bytes`] to materialise the
1673    /// `Val` tree once at parse time so subsequent `collect` calls find a populated
1674    /// `root_val` cache and skip re-interning.
1675    pub(crate) fn root_val_with(
1676        &self,
1677        keys: &crate::data::intern::KeyCache,
1678    ) -> std::result::Result<Val, EvalError> {
1679        if let Some(root) = self.root_val.get() {
1680            return Ok(root.clone());
1681        }
1682        let root = {
1683            if let Some(tape) = self.lazy_tape()? {
1684                Val::from_tape_data_with(keys, tape)
1685            } else {
1686                Val::from_value_with(keys, &self.document)
1687            }
1688        };
1689        let _ = self.root_val.set(root);
1690        Ok(self.root_val.get().expect("root val initialized").clone())
1691    }
1692
1693    /// Parse raw JSON bytes and build a `Jetro` query handle.
1694    /// The bytes are not parsed eagerly; the tape is built lazily on the first
1695    /// query that needs it.
1696    pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
1697        Ok(Self {
1698            document: Value::Null,
1699            root_val: OnceCell::new(),
1700            objvec_cache: Default::default(),
1701            raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1702            tape: OnceCell::new(),
1703            structural_index: OnceCell::new(),
1704            vm: RefCell::new(VM::new()),
1705        })
1706    }
1707
1708    /// Borrow this document's VM cache, falling back to a temporary VM on re-entrant use.
1709    pub(crate) fn with_vm<F, R>(&self, f: F) -> R
1710    where
1711        F: FnOnce(&mut VM) -> R,
1712    {
1713        match self.vm.try_borrow_mut() {
1714            Ok(mut vm) => f(&mut vm),
1715            Err(_) => {
1716                let mut vm = VM::new();
1717                f(&mut vm)
1718            }
1719        }
1720    }
1721
1722    /// Return the raw JSON byte slice if this handle was constructed from bytes,
1723    /// or `None` if it was constructed from a `serde_json::Value`.
1724    pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
1725        self.raw_bytes.as_deref()
1726    }
1727
1728    /// Return a reference to the lazily built `StructuralIndex` for key-presence
1729    /// queries, constructing it from raw bytes on first access if available.
1730    pub(crate) fn lazy_structural_index(
1731        &self,
1732    ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
1733        if let Some(result) = self.structural_index.get() {
1734            return result
1735                .as_ref()
1736                .map(Some)
1737                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1738        }
1739        let Some(raw) = self.raw_bytes.as_ref() else {
1740            return Ok(None);
1741        };
1742        let built = jetro_experimental::from_bytes_with(
1743            raw.as_ref(),
1744            jetro_experimental::BuildOptions::keys_only(),
1745        )
1746        .map(Arc::new)
1747        .map_err(|err| err.to_string());
1748        let _ = self.structural_index.set(built);
1749        self.structural_index
1750            .get()
1751            .expect("structural index cache initialized")
1752            .as_ref()
1753            .map(Some)
1754            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1755    }
1756
1757    /// Return the root `Val` for the document, building and caching it from the
1758    /// tape or from the `serde_json::Value` on first access.
1759    pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
1760        if let Some(root) = self.root_val.get() {
1761            return Ok(root.clone());
1762        }
1763        let root = {
1764            if let Some(tape) = self.lazy_tape()? {
1765                Val::from_tape_data(tape)
1766            } else {
1767                Val::from(&self.document)
1768            }
1769        };
1770        let _ = self.root_val.set(root);
1771        Ok(self.root_val.get().expect("root val initialized").clone())
1772    }
1773
1774    /// Return `true` if the `Val` tree has already been materialised; used in
1775    /// tests to assert that lazy evaluation is working correctly.
1776    #[cfg(test)]
1777    pub(crate) fn root_val_is_materialized(&self) -> bool {
1778        self.root_val.get().is_some()
1779    }
1780
1781    #[cfg(test)]
1782    pub(crate) fn structural_index_is_built(&self) -> bool {
1783        self.structural_index.get().is_some()
1784    }
1785
1786    #[cfg(test)]
1787    pub(crate) fn tape_is_built(&self) -> bool {
1788        self.tape.get().is_some()
1789    }
1790
1791    #[cfg(test)]
1792    pub(crate) fn reset_tape_materialized_subtrees(&self) {
1793        if let Ok(Some(tape)) = self.lazy_tape() {
1794            tape.reset_materialized_subtrees();
1795        }
1796    }
1797
1798    #[cfg(test)]
1799    pub(crate) fn tape_materialized_subtrees(&self) -> usize {
1800        self.lazy_tape()
1801            .ok()
1802            .flatten()
1803            .map(|tape| tape.materialized_subtrees())
1804            .unwrap_or(0)
1805    }
1806
1807    /// Evaluate a Jetro expression against this document and return the result
1808    /// as a `serde_json::Value`. Uses this document's VM with compile and
1809    /// path-resolution caches for repeated calls.
1810    pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
1811        exec::router::collect_json(self, expr.as_ref())
1812    }
1813}
1814
1815/// Wrap an existing `serde_json::Value` in a `Jetro` handle without raw bytes.
1816/// Prefer `Jetro::from_bytes` when you have the original JSON source, as it
1817/// enables the tape and structural-index lazy backends.
1818impl From<Value> for Jetro {
1819    /// Convert a `serde_json::Value` into a `Jetro` query handle.
1820    fn from(v: Value) -> Self {
1821        Self::new(v)
1822    }
1823}