Skip to main content

jetro_core/
lib.rs

1//! Jetro core — parser, compiler, and VM for the Jetro JSON query language.
2//!
3//! # Execution path
4//!
5//! ```text
6//! source text
7//!   │  parse::parser::parse() → Expr AST
8//!   │  plan::physical::plan_query() → QueryPlan (physical IR)
9//!   │  exec::router::collect_*() → dispatches to:
10//!   │    StructuralIndex backend  (jetro-experimental bitmap)
11//!   │    ViewPipeline backend     (borrowed tape/Val navigation)
12//!   │    Pipeline backend         (pull-based composed stages)
13//!   └─  VM fallback               (bytecode stack machine)
14//! ```
15//!
16//! # Quick start
17//!
18//! ```rust
19//! use jetro_core::Jetro;
20//! let j = Jetro::from_bytes(br#"{"books":[{"price":12}]}"#.to_vec()).unwrap();
21//! assert_eq!(j.collect("$.books.len()").unwrap(), serde_json::json!(1));
22//! ```
23//!
24//! ```rust
25//! use jetro_core::JetroEngine;
26//! use std::io::Cursor;
27//!
28//! let engine = JetroEngine::new();
29//! let rows = Cursor::new(br#"{"name":"Ada"}
30//! {"name":"Bob"}
31//! "#);
32//! let names = engine.collect_ndjson(rows, "name").unwrap();
33//! assert_eq!(names, vec![serde_json::json!("Ada"), serde_json::json!("Bob")]);
34//! ```
35//!
36//! Match-limited NDJSON helpers evaluate a predicate per row, return the
37//! original full row for truthy matches, and stop after the requested number of
38//! matches:
39//!
40//! ```rust
41//! use jetro_core::JetroEngine;
42//! use std::io::Cursor;
43//!
44//! let engine = JetroEngine::new();
45//! let rows = Cursor::new(br#"{"id":1,"active":true}
46//! {"id":2,"active":false}
47//! {"id":3,"active":true}
48//! "#);
49//! let first_two = engine.collect_ndjson_matches(rows, "active", 2).unwrap();
50//! assert_eq!(first_two, vec![
51//!     serde_json::json!({"id": 1, "active": true}),
52//!     serde_json::json!({"id": 3, "active": true}),
53//! ]);
54//! ```
55
56pub(crate) mod builtins;
57pub(crate) mod compile;
58pub(crate) mod data;
59pub(crate) mod exec;
60pub mod io;
61pub(crate) mod ir;
62pub(crate) mod parse;
63pub(crate) mod plan;
64pub(crate) mod util;
65pub(crate) mod vm;
66
67#[cfg(test)]
68mod tests;
69
70use data::value::Val;
71use serde_json::Value;
72use std::cell::{OnceCell, RefCell};
73use std::collections::HashMap;
74use std::sync::Arc;
75use std::sync::Mutex;
76
77pub use data::context::EvalError;
78#[cfg(test)]
79use parse::parser::ParseError;
80use vm::VM;
81
82/// Internal parser surface re-exported only when the `fuzz_internal` feature
83/// is enabled. Used by the `cargo-fuzz` harness to reach the PEG parser
84/// without going through `Jetro::collect`. NOT a stable public API.
85#[cfg(feature = "fuzz_internal")]
86pub mod __fuzz_internal {
87    pub use crate::parse::parser::{parse, ParseError};
88    pub use crate::plan::physical::plan_query;
89}
90
91#[cfg(test)]
92#[derive(Debug)]
93pub(crate) enum Error {
94    Parse(ParseError),
95    Eval(EvalError),
96}
97
98#[cfg(test)]
99impl std::fmt::Display for Error {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            Error::Parse(e) => write!(f, "{}", e),
103            Error::Eval(e) => write!(f, "{}", e),
104        }
105    }
106}
107#[cfg(test)]
108impl std::error::Error for Error {
109    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
110        match self {
111            Error::Parse(e) => Some(e),
112            Error::Eval(_) => None,
113        }
114    }
115}
116
117#[cfg(test)]
118impl From<ParseError> for Error {
119    fn from(e: ParseError) -> Self {
120        Error::Parse(e)
121    }
122}
123#[cfg(test)]
124impl From<EvalError> for Error {
125    fn from(e: EvalError) -> Self {
126        Error::Eval(e)
127    }
128}
129
130/// Primary entry point. Holds a JSON document and evaluates expressions against
131/// it. Lazy fields (`root_val`, `tape`, `structural_index`, `objvec_cache`)
132/// are populated on first use so callers only pay for the representations a
133/// particular query actually needs.
134pub struct Jetro {
135    /// The `serde_json::Value` root document; unused when `simd-json` is enabled
136    /// (the tape is the authoritative source in that case).
137    document: Value,
138    /// Cached `Val` tree — built once and reused across `collect()` calls.
139    root_val: OnceCell<Val>,
140    /// Retained raw bytes for lazy tape and structural-index materialisation.
141    raw_bytes: Option<Arc<[u8]>>,
142
143    /// Lazily parsed simd-json tape; `Err` is cached to avoid re-parsing after failure.
144    #[cfg(feature = "simd-json")]
145    tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
146    /// Unused placeholder so the field name is consistent regardless of features.
147    #[cfg(not(feature = "simd-json"))]
148    #[allow(dead_code)]
149    tape: OnceCell<()>,
150
151    /// Lazily built bitmap structural index for accelerated key-presence queries.
152    structural_index:
153        OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
154
155    /// Per-document cache from `Arc<Vec<Val>>` pointer addresses to promoted
156    /// `ObjVecData` columnar representations; keyed by pointer to avoid re-promotion.
157    pub(crate) objvec_cache:
158        std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
159
160    /// Per-document VM cache used by `Jetro::collect`; not shared across document handles.
161    vm: RefCell<VM>,
162}
163
164/// Long-lived multi-document query engine with an explicit plan cache.
165/// Use when the same process evaluates many expressions over many documents —
166/// parse/lower/compile work is amortised by this object, not hidden in
167/// thread-local state.
168pub struct JetroEngine {
169    /// Maps `"<context_key>\0<expr>"` to compiled `QueryPlan`; evicted wholesale when full.
170    plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
171    /// Maximum number of entries before the cache is cleared; 0 disables caching.
172    plan_cache_limit: usize,
173    /// The shared `VM` used by all `collect*` calls on this engine instance.
174    vm: Mutex<VM>,
175    /// Engine-owned JSON object-key intern cache. Used by [`JetroEngine::parse_value`]
176    /// and [`JetroEngine::parse_bytes`] (and the `collect_*` shortcuts that go through
177    /// them) so each engine instance has an isolated key cache. Documents built via
178    /// the standalone `Jetro::from_bytes`/`From<serde_json::Value>` paths use the
179    /// process-wide [`crate::data::intern::default_cache`] instead.
180    keys: Arc<crate::data::intern::KeyCache>,
181}
182
183/// Error returned by `JetroEngine::collect_bytes` and similar methods that
184/// may fail during JSON parsing or during expression evaluation.
185#[derive(Debug)]
186pub enum JetroEngineError {
187    /// JSON parsing failed before evaluation could begin.
188    Json(serde_json::Error),
189    /// Reading from a stream or writing results failed.
190    Io(std::io::Error),
191    /// NDJSON row parsing failed with row context.
192    Ndjson(io::RowError),
193    /// Expression evaluation failed (the JSON was valid but the query errored).
194    Eval(EvalError),
195}
196
197impl std::fmt::Display for JetroEngineError {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        match self {
200            Self::Json(err) => write!(f, "{}", err),
201            Self::Io(err) => write!(f, "{}", err),
202            Self::Ndjson(err) => write!(f, "{}", err),
203            Self::Eval(err) => write!(f, "{}", err),
204        }
205    }
206}
207
208impl std::error::Error for JetroEngineError {
209    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
210        match self {
211            Self::Json(err) => Some(err),
212            Self::Io(err) => Some(err),
213            Self::Ndjson(err) => Some(err),
214            Self::Eval(_) => None,
215        }
216    }
217}
218
219impl From<serde_json::Error> for JetroEngineError {
220    fn from(err: serde_json::Error) -> Self {
221        Self::Json(err)
222    }
223}
224
225impl From<std::io::Error> for JetroEngineError {
226    fn from(err: std::io::Error) -> Self {
227        Self::Io(err)
228    }
229}
230
231impl From<io::RowError> for JetroEngineError {
232    fn from(err: io::RowError) -> Self {
233        Self::Ndjson(err)
234    }
235}
236
237impl From<EvalError> for JetroEngineError {
238    fn from(err: EvalError) -> Self {
239        Self::Eval(err)
240    }
241}
242
243impl Default for JetroEngine {
244    fn default() -> Self {
245        Self::new()
246    }
247}
248
249impl JetroEngine {
250    /// Default maximum plan-cache size; the cache is cleared wholesale when reached.
251    const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
252
253    /// Create a `JetroEngine` with the default plan-cache limit of 256 entries.
254    pub fn new() -> Self {
255        Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
256    }
257
258    /// Create a `JetroEngine` with an explicit plan-cache capacity.
259    /// Set `plan_cache_limit` to 0 to disable caching entirely.
260    pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
261        Self {
262            plan_cache: Mutex::new(HashMap::new()),
263            plan_cache_limit,
264            vm: Mutex::new(VM::new()),
265            keys: crate::data::intern::KeyCache::new(),
266        }
267    }
268
269    /// Borrow this engine's JSON key-intern cache.
270    pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
271        &self.keys
272    }
273
274    /// Discard all cached query plans and the engine's key-intern cache,
275    /// forcing re-compilation and re-interning on the next call.
276    pub fn clear_cache(&self) {
277        self.plan_cache.lock().expect("plan cache poisoned").clear();
278        self.keys.clear();
279    }
280
281    /// Build a `Jetro` document from a `serde_json::Value` with object keys
282    /// interned into this engine's key cache. Use this in place of
283    /// `Jetro::from(...)` / the `From<serde_json::Value>` impl when
284    /// per-engine key isolation is required.
285    pub fn parse_value(&self, document: Value) -> Jetro {
286        let root = Val::from_value_with(&self.keys, &document);
287        Jetro::from_val_and_value(root, document)
288    }
289
290    /// Parse raw JSON bytes into a `Jetro` document with object keys
291    /// interned into this engine's key cache. With `simd-json`, the tape
292    /// is materialised eagerly so interning happens once at parse time
293    /// (subsequent `collect` calls reuse the cached `Val` tree).
294    pub fn parse_bytes(&self, bytes: Vec<u8>) -> std::result::Result<Jetro, JetroEngineError> {
295        let document = Jetro::from_bytes(bytes)?;
296        // Force materialisation so keys are interned through this
297        // engine's cache rather than the default thread-local one when
298        // `collect` later asks for `root_val`.
299        let _ = document.root_val_with(&self.keys)?;
300        Ok(document)
301    }
302
303    /// Parse raw JSON bytes into a `Jetro` document without forcing the `Val`
304    /// tree. This keeps byte-backed callers eligible for tape/view execution;
305    /// object keys are interned only if execution later materialises the row.
306    pub(crate) fn parse_bytes_lazy(
307        &self,
308        bytes: Vec<u8>,
309    ) -> std::result::Result<Jetro, JetroEngineError> {
310        Ok(Jetro::from_bytes(bytes)?)
311    }
312
313    /// Evaluate a Jetro expression against an already-constructed `Jetro` document,
314    /// using the engine's shared plan cache and `VM`.
315    pub fn collect<S: AsRef<str>>(
316        &self,
317        document: &Jetro,
318        expr: S,
319    ) -> std::result::Result<Value, EvalError> {
320        let expr = expr.as_ref();
321        if let Some(rows) = io::collect_document_rows(self, document, expr)? {
322            return Ok(Value::from(rows));
323        }
324        let plan = self.cached_plan(expr, exec::router::planning_context(document));
325        self.collect_prepared(document, &plan)
326    }
327
328    pub(crate) fn collect_prepared(
329        &self,
330        document: &Jetro,
331        plan: &ir::physical::QueryPlan,
332    ) -> std::result::Result<Value, EvalError> {
333        self.collect_prepared_val(document, plan).map(Value::from)
334    }
335
336    pub(crate) fn collect_prepared_val(
337        &self,
338        document: &Jetro,
339        plan: &ir::physical::QueryPlan,
340    ) -> std::result::Result<Val, EvalError> {
341        let mut vm = self.vm.lock().expect("vm cache poisoned");
342        exec::router::collect_plan_val_with_vm(document, plan, &mut vm)
343    }
344
345    pub(crate) fn lock_vm(&self) -> std::sync::MutexGuard<'_, VM> {
346        self.vm.lock().expect("vm cache poisoned")
347    }
348
349    /// Convenience wrapper: wrap a `serde_json::Value` in a `Jetro` and evaluate `expr`.
350    /// Routes through [`JetroEngine::parse_value`] so the document's object keys are
351    /// interned into this engine's key cache.
352    pub fn collect_value<S: AsRef<str>>(
353        &self,
354        document: Value,
355        expr: S,
356    ) -> std::result::Result<Value, EvalError> {
357        let document = self.parse_value(document);
358        self.collect(&document, expr)
359    }
360
361    /// Parse raw JSON bytes into a `Jetro` document and evaluate `expr`,
362    /// returning a `JetroEngineError` on either parse or evaluation failure.
363    /// Routes through [`JetroEngine::parse_bytes`] so the document's object keys
364    /// are interned into this engine's key cache.
365    pub fn collect_bytes<S: AsRef<str>>(
366        &self,
367        bytes: Vec<u8>,
368        expr: S,
369    ) -> std::result::Result<Value, JetroEngineError> {
370        let document = self.parse_bytes(bytes)?;
371        Ok(self.collect(&document, expr)?)
372    }
373
374    /// Evaluate `query` independently for every non-empty NDJSON row and write
375    /// one JSON result per output line.
376    pub fn run_ndjson<R, W>(
377        &self,
378        reader: R,
379        query: &str,
380        writer: W,
381    ) -> std::result::Result<usize, JetroEngineError>
382    where
383        R: std::io::BufRead,
384        W: std::io::Write,
385    {
386        io::run_ndjson(self, reader, query, writer)
387    }
388
389    /// Open an NDJSON file and evaluate `query` independently for every
390    /// non-empty row, writing one JSON result per output line.
391    pub fn run_ndjson_file<P, W>(
392        &self,
393        path: P,
394        query: &str,
395        writer: W,
396    ) -> std::result::Result<usize, JetroEngineError>
397    where
398        P: AsRef<std::path::Path>,
399        W: std::io::Write,
400    {
401        io::run_ndjson_file(self, path, query, writer)
402    }
403
404    /// Like [`JetroEngine::run_ndjson_file`] with explicit NDJSON reader options.
405    pub fn run_ndjson_file_with_options<P, W>(
406        &self,
407        path: P,
408        query: &str,
409        writer: W,
410        options: io::NdjsonOptions,
411    ) -> std::result::Result<usize, JetroEngineError>
412    where
413        P: AsRef<std::path::Path>,
414        W: std::io::Write,
415    {
416        io::run_ndjson_file_with_options(self, path, query, writer, options)
417    }
418
419    /// Open an NDJSON file, write at most `limit` query results, and stop reading.
420    pub fn run_ndjson_file_limit<P, W>(
421        &self,
422        path: P,
423        query: &str,
424        limit: usize,
425        writer: W,
426    ) -> std::result::Result<usize, JetroEngineError>
427    where
428        P: AsRef<std::path::Path>,
429        W: std::io::Write,
430    {
431        io::run_ndjson_file_limit(self, path, query, limit, writer)
432    }
433
434    /// Like [`JetroEngine::run_ndjson_file_limit`] with explicit NDJSON reader options.
435    pub fn run_ndjson_file_limit_with_options<P, W>(
436        &self,
437        path: P,
438        query: &str,
439        limit: usize,
440        writer: W,
441        options: io::NdjsonOptions,
442    ) -> std::result::Result<usize, JetroEngineError>
443    where
444        P: AsRef<std::path::Path>,
445        W: std::io::Write,
446    {
447        io::run_ndjson_file_limit_with_options(self, path, query, limit, writer, options)
448    }
449
450    /// Evaluate `query` independently for every row from an [`io::NdjsonSource`].
451    pub fn run_ndjson_source<W>(
452        &self,
453        source: io::NdjsonSource,
454        query: &str,
455        writer: W,
456    ) -> std::result::Result<usize, JetroEngineError>
457    where
458        W: std::io::Write,
459    {
460        io::run_ndjson_source(self, source, query, writer)
461    }
462
463    /// Like [`JetroEngine::run_ndjson_source`] with explicit NDJSON reader options.
464    pub fn run_ndjson_source_with_options<W>(
465        &self,
466        source: io::NdjsonSource,
467        query: &str,
468        writer: W,
469        options: io::NdjsonOptions,
470    ) -> std::result::Result<usize, JetroEngineError>
471    where
472        W: std::io::Write,
473    {
474        io::run_ndjson_source_with_options(self, source, query, writer, options)
475    }
476
477    /// Evaluate `query` for rows from an [`io::NdjsonSource`], write at most
478    /// `limit` results, and stop reading.
479    pub fn run_ndjson_source_limit<W>(
480        &self,
481        source: io::NdjsonSource,
482        query: &str,
483        limit: usize,
484        writer: W,
485    ) -> std::result::Result<usize, JetroEngineError>
486    where
487        W: std::io::Write,
488    {
489        io::run_ndjson_source_limit(self, source, query, limit, writer)
490    }
491
492    /// Like [`JetroEngine::run_ndjson_source_limit`] with explicit NDJSON reader options.
493    pub fn run_ndjson_source_limit_with_options<W>(
494        &self,
495        source: io::NdjsonSource,
496        query: &str,
497        limit: usize,
498        writer: W,
499        options: io::NdjsonOptions,
500    ) -> std::result::Result<usize, JetroEngineError>
501    where
502        W: std::io::Write,
503    {
504        io::run_ndjson_source_limit_with_options(self, source, query, limit, writer, options)
505    }
506
507    /// Read an NDJSON file from tail to head and write one query result per row.
508    pub fn run_ndjson_rev<P, W>(
509        &self,
510        path: P,
511        query: &str,
512        writer: W,
513    ) -> std::result::Result<usize, JetroEngineError>
514    where
515        P: AsRef<std::path::Path>,
516        W: std::io::Write,
517    {
518        io::run_ndjson_rev(self, path, query, writer)
519    }
520
521    /// Like [`JetroEngine::run_ndjson_rev`] with explicit NDJSON reader options.
522    pub fn run_ndjson_rev_with_options<P, W>(
523        &self,
524        path: P,
525        query: &str,
526        writer: W,
527        options: io::NdjsonOptions,
528    ) -> std::result::Result<usize, JetroEngineError>
529    where
530        P: AsRef<std::path::Path>,
531        W: std::io::Write,
532    {
533        io::run_ndjson_rev_with_options(self, path, query, writer, options)
534    }
535
536    /// Read an NDJSON file from tail to head, write at most `limit` query
537    /// results, and stop reading.
538    pub fn run_ndjson_rev_limit<P, W>(
539        &self,
540        path: P,
541        query: &str,
542        limit: usize,
543        writer: W,
544    ) -> std::result::Result<usize, JetroEngineError>
545    where
546        P: AsRef<std::path::Path>,
547        W: std::io::Write,
548    {
549        io::run_ndjson_rev_limit(self, path, query, limit, writer)
550    }
551
552    /// Like [`JetroEngine::run_ndjson_rev_limit`] with explicit NDJSON reader options.
553    pub fn run_ndjson_rev_limit_with_options<P, W>(
554        &self,
555        path: P,
556        query: &str,
557        limit: usize,
558        writer: W,
559        options: io::NdjsonOptions,
560    ) -> std::result::Result<usize, JetroEngineError>
561    where
562        P: AsRef<std::path::Path>,
563        W: std::io::Write,
564    {
565        io::run_ndjson_rev_limit_with_options(self, path, query, limit, writer, options)
566    }
567
568    /// Read an NDJSON file from tail to head, keep only the first row seen for
569    /// each `key_query` result in that reverse stream order, write `query` for
570    /// retained rows, and stop after `limit` retained rows.
571    pub fn run_ndjson_rev_distinct_by<P, W>(
572        &self,
573        path: P,
574        key_query: &str,
575        query: &str,
576        limit: usize,
577        writer: W,
578    ) -> std::result::Result<usize, JetroEngineError>
579    where
580        P: AsRef<std::path::Path>,
581        W: std::io::Write,
582    {
583        io::run_ndjson_rev_distinct_by(self, path, key_query, query, limit, writer)
584    }
585
586    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`] with explicit NDJSON
587    /// reader options.
588    pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
589        &self,
590        path: P,
591        key_query: &str,
592        query: &str,
593        limit: usize,
594        writer: W,
595        options: io::NdjsonOptions,
596    ) -> std::result::Result<usize, JetroEngineError>
597    where
598        P: AsRef<std::path::Path>,
599        W: std::io::Write,
600    {
601        io::run_ndjson_rev_distinct_by_with_options(
602            self, path, key_query, query, limit, writer, options,
603        )
604    }
605
606    /// Like [`JetroEngine::run_ndjson_rev_distinct_by`], returning execution
607    /// counters for path-selection and duplicate-drop observability.
608    pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
609        &self,
610        path: P,
611        key_query: &str,
612        query: &str,
613        limit: usize,
614        writer: W,
615    ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
616    where
617        P: AsRef<std::path::Path>,
618        W: std::io::Write,
619    {
620        io::run_ndjson_rev_distinct_by_with_stats(self, path, key_query, query, limit, writer)
621    }
622
623    /// Like [`JetroEngine::run_ndjson_rev_distinct_by_with_stats`] with explicit
624    /// NDJSON reader options.
625    pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
626        &self,
627        path: P,
628        key_query: &str,
629        query: &str,
630        limit: usize,
631        writer: W,
632        options: io::NdjsonOptions,
633    ) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
634    where
635        P: AsRef<std::path::Path>,
636        W: std::io::Write,
637    {
638        io::run_ndjson_rev_distinct_by_with_stats_and_options(
639            self, path, key_query, query, limit, writer, options,
640        )
641    }
642
643    /// Like [`JetroEngine::run_ndjson`] with explicit NDJSON reader options.
644    pub fn run_ndjson_with_options<R, W>(
645        &self,
646        reader: R,
647        query: &str,
648        writer: W,
649        options: io::NdjsonOptions,
650    ) -> std::result::Result<usize, JetroEngineError>
651    where
652        R: std::io::BufRead,
653        W: std::io::Write,
654    {
655        io::run_ndjson_with_options(self, reader, query, writer, options)
656    }
657
658    /// Evaluate `query` for NDJSON rows, write at most `limit` results, and stop reading.
659    pub fn run_ndjson_limit<R, W>(
660        &self,
661        reader: R,
662        query: &str,
663        limit: usize,
664        writer: W,
665    ) -> std::result::Result<usize, JetroEngineError>
666    where
667        R: std::io::BufRead,
668        W: std::io::Write,
669    {
670        io::run_ndjson_limit(self, reader, query, limit, writer)
671    }
672
673    /// Like [`JetroEngine::run_ndjson_limit`] with explicit NDJSON reader options.
674    pub fn run_ndjson_limit_with_options<R, W>(
675        &self,
676        reader: R,
677        query: &str,
678        limit: usize,
679        writer: W,
680        options: io::NdjsonOptions,
681    ) -> std::result::Result<usize, JetroEngineError>
682    where
683        R: std::io::BufRead,
684        W: std::io::Write,
685    {
686        io::run_ndjson_limit_with_options(self, reader, query, limit, writer, options)
687    }
688
689    /// Evaluate `predicate` for each NDJSON row, write matching original rows,
690    /// and stop after `limit` matches.
691    pub fn run_ndjson_matches<R, W>(
692        &self,
693        reader: R,
694        predicate: &str,
695        limit: usize,
696        writer: W,
697    ) -> std::result::Result<usize, JetroEngineError>
698    where
699        R: std::io::BufRead,
700        W: std::io::Write,
701    {
702        io::run_ndjson_matches(self, reader, predicate, limit, writer)
703    }
704
705    /// Like [`JetroEngine::run_ndjson_matches`] with explicit NDJSON reader options.
706    pub fn run_ndjson_matches_with_options<R, W>(
707        &self,
708        reader: R,
709        predicate: &str,
710        limit: usize,
711        writer: W,
712        options: io::NdjsonOptions,
713    ) -> std::result::Result<usize, JetroEngineError>
714    where
715        R: std::io::BufRead,
716        W: std::io::Write,
717    {
718        io::run_ndjson_matches_with_options(self, reader, predicate, limit, writer, options)
719    }
720
721    /// Open an NDJSON file, write matching original rows, and stop after `limit` matches.
722    pub fn run_ndjson_matches_file<P, W>(
723        &self,
724        path: P,
725        predicate: &str,
726        limit: usize,
727        writer: W,
728    ) -> std::result::Result<usize, JetroEngineError>
729    where
730        P: AsRef<std::path::Path>,
731        W: std::io::Write,
732    {
733        io::run_ndjson_matches_file(self, path, predicate, limit, writer)
734    }
735
736    /// Like [`JetroEngine::run_ndjson_matches_file`] with explicit NDJSON reader options.
737    pub fn run_ndjson_matches_file_with_options<P, W>(
738        &self,
739        path: P,
740        predicate: &str,
741        limit: usize,
742        writer: W,
743        options: io::NdjsonOptions,
744    ) -> std::result::Result<usize, JetroEngineError>
745    where
746        P: AsRef<std::path::Path>,
747        W: std::io::Write,
748    {
749        io::run_ndjson_matches_file_with_options(self, path, predicate, limit, writer, options)
750    }
751
752    /// Evaluate `predicate` against each row from an [`io::NdjsonSource`], write
753    /// matching original rows, and stop after `limit` matches.
754    pub fn run_ndjson_matches_source<W>(
755        &self,
756        source: io::NdjsonSource,
757        predicate: &str,
758        limit: usize,
759        writer: W,
760    ) -> std::result::Result<usize, JetroEngineError>
761    where
762        W: std::io::Write,
763    {
764        io::run_ndjson_matches_source(self, source, predicate, limit, writer)
765    }
766
767    /// Like [`JetroEngine::run_ndjson_matches_source`] with explicit NDJSON reader options.
768    pub fn run_ndjson_matches_source_with_options<W>(
769        &self,
770        source: io::NdjsonSource,
771        predicate: &str,
772        limit: usize,
773        writer: W,
774        options: io::NdjsonOptions,
775    ) -> std::result::Result<usize, JetroEngineError>
776    where
777        W: std::io::Write,
778    {
779        io::run_ndjson_matches_source_with_options(self, source, predicate, limit, writer, options)
780    }
781
782    /// Read an NDJSON file from tail to head, write matching original rows, and
783    /// stop after `limit` matches.
784    pub fn run_ndjson_rev_matches<P, W>(
785        &self,
786        path: P,
787        predicate: &str,
788        limit: usize,
789        writer: W,
790    ) -> std::result::Result<usize, JetroEngineError>
791    where
792        P: AsRef<std::path::Path>,
793        W: std::io::Write,
794    {
795        io::run_ndjson_rev_matches(self, path, predicate, limit, writer)
796    }
797
798    /// Like [`JetroEngine::run_ndjson_rev_matches`] with explicit NDJSON reader options.
799    pub fn run_ndjson_rev_matches_with_options<P, W>(
800        &self,
801        path: P,
802        predicate: &str,
803        limit: usize,
804        writer: W,
805        options: io::NdjsonOptions,
806    ) -> std::result::Result<usize, JetroEngineError>
807    where
808        P: AsRef<std::path::Path>,
809        W: std::io::Write,
810    {
811        io::run_ndjson_rev_matches_with_options(self, path, predicate, limit, writer, options)
812    }
813
814    /// Evaluate `query` independently for every non-empty NDJSON row and collect
815    /// the per-row results.
816    pub fn collect_ndjson<R>(
817        &self,
818        reader: R,
819        query: &str,
820    ) -> std::result::Result<Vec<Value>, JetroEngineError>
821    where
822        R: std::io::BufRead,
823    {
824        io::collect_ndjson(self, reader, query)
825    }
826
827    /// Open an NDJSON file and collect per-row query results.
828    pub fn collect_ndjson_file<P>(
829        &self,
830        path: P,
831        query: &str,
832    ) -> std::result::Result<Vec<Value>, JetroEngineError>
833    where
834        P: AsRef<std::path::Path>,
835    {
836        io::collect_ndjson_file(self, path, query)
837    }
838
839    /// Like [`JetroEngine::collect_ndjson_file`] with explicit NDJSON reader options.
840    pub fn collect_ndjson_file_with_options<P>(
841        &self,
842        path: P,
843        query: &str,
844        options: io::NdjsonOptions,
845    ) -> std::result::Result<Vec<Value>, JetroEngineError>
846    where
847        P: AsRef<std::path::Path>,
848    {
849        io::collect_ndjson_file_with_options(self, path, query, options)
850    }
851
852    /// Collect per-row query results from an [`io::NdjsonSource`].
853    pub fn collect_ndjson_source(
854        &self,
855        source: io::NdjsonSource,
856        query: &str,
857    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
858        io::collect_ndjson_source(self, source, query)
859    }
860
861    /// Like [`JetroEngine::collect_ndjson_source`] with explicit NDJSON reader options.
862    pub fn collect_ndjson_source_with_options(
863        &self,
864        source: io::NdjsonSource,
865        query: &str,
866        options: io::NdjsonOptions,
867    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
868        io::collect_ndjson_source_with_options(self, source, query, options)
869    }
870
871    /// Read an NDJSON file from tail to head and collect per-row query results.
872    pub fn collect_ndjson_rev<P>(
873        &self,
874        path: P,
875        query: &str,
876    ) -> std::result::Result<Vec<Value>, JetroEngineError>
877    where
878        P: AsRef<std::path::Path>,
879    {
880        io::collect_ndjson_rev(self, path, query)
881    }
882
883    /// Like [`JetroEngine::collect_ndjson_rev`] with explicit NDJSON reader options.
884    pub fn collect_ndjson_rev_with_options<P>(
885        &self,
886        path: P,
887        query: &str,
888        options: io::NdjsonOptions,
889    ) -> std::result::Result<Vec<Value>, JetroEngineError>
890    where
891        P: AsRef<std::path::Path>,
892    {
893        io::collect_ndjson_rev_with_options(self, path, query, options)
894    }
895
896    /// Read an NDJSON file from tail to head and call `f` with each query result
897    /// as it is produced.
898    pub fn for_each_ndjson_rev<P, F>(
899        &self,
900        path: P,
901        query: &str,
902        f: F,
903    ) -> std::result::Result<usize, JetroEngineError>
904    where
905        P: AsRef<std::path::Path>,
906        F: FnMut(Value),
907    {
908        io::for_each_ndjson_rev(self, path, query, f)
909    }
910
911    /// Read an NDJSON file from tail to head and call `f` until it returns
912    /// [`io::NdjsonControl::Stop`] or input is exhausted.
913    pub fn for_each_ndjson_rev_until<P, F>(
914        &self,
915        path: P,
916        query: &str,
917        f: F,
918    ) -> std::result::Result<usize, JetroEngineError>
919    where
920        P: AsRef<std::path::Path>,
921        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
922    {
923        io::for_each_ndjson_rev_with_options(self, path, query, io::NdjsonOptions::default(), f)
924    }
925
926    /// Like [`JetroEngine::for_each_ndjson_rev_until`] with explicit NDJSON reader options.
927    pub fn for_each_ndjson_rev_until_with_options<P, F>(
928        &self,
929        path: P,
930        query: &str,
931        options: io::NdjsonOptions,
932        f: F,
933    ) -> std::result::Result<usize, JetroEngineError>
934    where
935        P: AsRef<std::path::Path>,
936        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
937    {
938        io::for_each_ndjson_rev_with_options(self, path, query, options, f)
939    }
940
941    /// Like [`JetroEngine::for_each_ndjson_rev`] with explicit NDJSON reader options.
942    pub fn for_each_ndjson_rev_with_options<P, F>(
943        &self,
944        path: P,
945        query: &str,
946        options: io::NdjsonOptions,
947        mut f: F,
948    ) -> std::result::Result<usize, JetroEngineError>
949    where
950        P: AsRef<std::path::Path>,
951        F: FnMut(Value),
952    {
953        io::for_each_ndjson_rev_with_options(self, path, query, options, |value| {
954            f(value);
955            Ok(io::NdjsonControl::Continue)
956        })
957    }
958
959    /// Like [`JetroEngine::collect_ndjson`] with explicit NDJSON reader options.
960    pub fn collect_ndjson_with_options<R>(
961        &self,
962        reader: R,
963        query: &str,
964        options: io::NdjsonOptions,
965    ) -> std::result::Result<Vec<Value>, JetroEngineError>
966    where
967        R: std::io::BufRead,
968    {
969        io::collect_ndjson_with_options(self, reader, query, options)
970    }
971
972    /// Evaluate `predicate` for each NDJSON row, collect matching original
973    /// rows, and stop after `limit` matches.
974    pub fn collect_ndjson_matches<R>(
975        &self,
976        reader: R,
977        predicate: &str,
978        limit: usize,
979    ) -> std::result::Result<Vec<Value>, JetroEngineError>
980    where
981        R: std::io::BufRead,
982    {
983        io::collect_ndjson_matches(self, reader, predicate, limit)
984    }
985
986    /// Like [`JetroEngine::collect_ndjson_matches`] with explicit NDJSON reader options.
987    pub fn collect_ndjson_matches_with_options<R>(
988        &self,
989        reader: R,
990        predicate: &str,
991        limit: usize,
992        options: io::NdjsonOptions,
993    ) -> std::result::Result<Vec<Value>, JetroEngineError>
994    where
995        R: std::io::BufRead,
996    {
997        io::collect_ndjson_matches_with_options(self, reader, predicate, limit, options)
998    }
999
1000    /// Open an NDJSON file, collect matching original rows, and stop after `limit` matches.
1001    pub fn collect_ndjson_matches_file<P>(
1002        &self,
1003        path: P,
1004        predicate: &str,
1005        limit: usize,
1006    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1007    where
1008        P: AsRef<std::path::Path>,
1009    {
1010        io::collect_ndjson_matches_file(self, path, predicate, limit)
1011    }
1012
1013    /// Like [`JetroEngine::collect_ndjson_matches_file`] with explicit NDJSON reader options.
1014    pub fn collect_ndjson_matches_file_with_options<P>(
1015        &self,
1016        path: P,
1017        predicate: &str,
1018        limit: usize,
1019        options: io::NdjsonOptions,
1020    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1021    where
1022        P: AsRef<std::path::Path>,
1023    {
1024        io::collect_ndjson_matches_file_with_options(self, path, predicate, limit, options)
1025    }
1026
1027    /// Evaluate `predicate` against each row from an [`io::NdjsonSource`],
1028    /// collect matching original rows, and stop after `limit` matches.
1029    pub fn collect_ndjson_matches_source(
1030        &self,
1031        source: io::NdjsonSource,
1032        predicate: &str,
1033        limit: usize,
1034    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1035        io::collect_ndjson_matches_source(self, source, predicate, limit)
1036    }
1037
1038    /// Like [`JetroEngine::collect_ndjson_matches_source`] with explicit NDJSON reader options.
1039    pub fn collect_ndjson_matches_source_with_options(
1040        &self,
1041        source: io::NdjsonSource,
1042        predicate: &str,
1043        limit: usize,
1044        options: io::NdjsonOptions,
1045    ) -> std::result::Result<Vec<Value>, JetroEngineError> {
1046        io::collect_ndjson_matches_source_with_options(self, source, predicate, limit, options)
1047    }
1048
1049    /// Read an NDJSON file from tail to head, collect matching original rows,
1050    /// and stop after `limit` matches.
1051    pub fn collect_ndjson_rev_matches<P>(
1052        &self,
1053        path: P,
1054        predicate: &str,
1055        limit: usize,
1056    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1057    where
1058        P: AsRef<std::path::Path>,
1059    {
1060        io::collect_ndjson_rev_matches(self, path, predicate, limit)
1061    }
1062
1063    /// Like [`JetroEngine::collect_ndjson_rev_matches`] with explicit NDJSON reader options.
1064    pub fn collect_ndjson_rev_matches_with_options<P>(
1065        &self,
1066        path: P,
1067        predicate: &str,
1068        limit: usize,
1069        options: io::NdjsonOptions,
1070    ) -> std::result::Result<Vec<Value>, JetroEngineError>
1071    where
1072        P: AsRef<std::path::Path>,
1073    {
1074        io::collect_ndjson_rev_matches_with_options(self, path, predicate, limit, options)
1075    }
1076
1077    /// Evaluate `query` independently for every non-empty NDJSON row and call
1078    /// `f` with each result as it is produced.
1079    pub fn for_each_ndjson<R, F>(
1080        &self,
1081        reader: R,
1082        query: &str,
1083        f: F,
1084    ) -> std::result::Result<usize, JetroEngineError>
1085    where
1086        R: std::io::BufRead,
1087        F: FnMut(Value),
1088    {
1089        io::for_each_ndjson(self, reader, query, f)
1090    }
1091
1092    /// Evaluate `query` independently for every non-empty NDJSON row and call
1093    /// `f` until it returns [`io::NdjsonControl::Stop`] or input is exhausted.
1094    pub fn for_each_ndjson_until<R, F>(
1095        &self,
1096        reader: R,
1097        query: &str,
1098        f: F,
1099    ) -> std::result::Result<usize, JetroEngineError>
1100    where
1101        R: std::io::BufRead,
1102        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1103    {
1104        io::for_each_ndjson_until(self, reader, query, f)
1105    }
1106
1107    /// Evaluate `query` for every row from an [`io::NdjsonSource`] and call
1108    /// `f` with each result as it is produced.
1109    pub fn for_each_ndjson_source<F>(
1110        &self,
1111        source: io::NdjsonSource,
1112        query: &str,
1113        f: F,
1114    ) -> std::result::Result<usize, JetroEngineError>
1115    where
1116        F: FnMut(Value),
1117    {
1118        io::for_each_ndjson_source(self, source, query, f)
1119    }
1120
1121    /// Evaluate `query` for every row from an [`io::NdjsonSource`] and call
1122    /// `f` until it returns [`io::NdjsonControl::Stop`] or input is exhausted.
1123    pub fn for_each_ndjson_source_until<F>(
1124        &self,
1125        source: io::NdjsonSource,
1126        query: &str,
1127        f: F,
1128    ) -> std::result::Result<usize, JetroEngineError>
1129    where
1130        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1131    {
1132        io::for_each_ndjson_source_until(self, source, query, f)
1133    }
1134
1135    /// Like [`JetroEngine::for_each_ndjson_source_until`] with explicit NDJSON reader options.
1136    pub fn for_each_ndjson_source_until_with_options<F>(
1137        &self,
1138        source: io::NdjsonSource,
1139        query: &str,
1140        options: io::NdjsonOptions,
1141        f: F,
1142    ) -> std::result::Result<usize, JetroEngineError>
1143    where
1144        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1145    {
1146        io::for_each_ndjson_source_until_with_options(self, source, query, options, f)
1147    }
1148
1149    /// Like [`JetroEngine::for_each_ndjson_source`] with explicit NDJSON reader options.
1150    pub fn for_each_ndjson_source_with_options<F>(
1151        &self,
1152        source: io::NdjsonSource,
1153        query: &str,
1154        options: io::NdjsonOptions,
1155        f: F,
1156    ) -> std::result::Result<usize, JetroEngineError>
1157    where
1158        F: FnMut(Value),
1159    {
1160        io::for_each_ndjson_source_with_options(self, source, query, options, f)
1161    }
1162
1163    /// Like [`JetroEngine::for_each_ndjson`] with explicit NDJSON reader options.
1164    pub fn for_each_ndjson_with_options<R, F>(
1165        &self,
1166        reader: R,
1167        query: &str,
1168        options: io::NdjsonOptions,
1169        f: F,
1170    ) -> std::result::Result<usize, JetroEngineError>
1171    where
1172        R: std::io::BufRead,
1173        F: FnMut(Value),
1174    {
1175        io::for_each_ndjson_with_options(self, reader, query, options, f)
1176    }
1177
1178    /// Like [`JetroEngine::for_each_ndjson_until`] with explicit NDJSON reader options.
1179    pub fn for_each_ndjson_until_with_options<R, F>(
1180        &self,
1181        reader: R,
1182        query: &str,
1183        options: io::NdjsonOptions,
1184        f: F,
1185    ) -> std::result::Result<usize, JetroEngineError>
1186    where
1187        R: std::io::BufRead,
1188        F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
1189    {
1190        io::for_each_ndjson_until_with_options(self, reader, query, options, f)
1191    }
1192
1193    /// Look up a compiled `QueryPlan` by expression string and planning context,
1194    /// compiling and inserting it if not already cached; evicts the whole cache if full.
1195    pub(crate) fn cached_plan(
1196        &self,
1197        expr: &str,
1198        context: plan::physical::PlanningContext,
1199    ) -> ir::physical::QueryPlan {
1200        let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
1201        let cache_key = format!("{}\0{}", context.cache_key(), expr);
1202        if let Some(plan) = cache.get(&cache_key) {
1203            return plan.clone();
1204        }
1205
1206        let plan = plan::physical::plan_query_with_context(expr, context);
1207        if self.plan_cache_limit > 0 {
1208            if cache.len() >= self.plan_cache_limit {
1209                cache.clear();
1210            }
1211            cache.insert(cache_key, plan.clone());
1212        }
1213        plan
1214    }
1215}
1216
1217impl exec::pipeline::PipelineData for Jetro {
1218    fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
1219        self.get_or_promote_objvec(arr)
1220    }
1221}
1222
1223impl Jetro {
1224    /// Return a reference to the lazily parsed simd-json `TapeData`, parsing raw bytes
1225    /// on first access. Returns `Ok(None)` when no raw bytes are stored.
1226    #[cfg(feature = "simd-json")]
1227    pub(crate) fn lazy_tape(
1228        &self,
1229    ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
1230        if let Some(result) = self.tape.get() {
1231            return result
1232                .as_ref()
1233                .map(Some)
1234                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1235        }
1236        let Some(raw) = self.raw_bytes.as_ref() else {
1237            return Ok(None);
1238        };
1239        let bytes: Vec<u8> = (**raw).to_vec();
1240        let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
1241        let _ = self.tape.set(parsed);
1242        self.tape
1243            .get()
1244            .expect("tape cache initialized")
1245            .as_ref()
1246            .map(Some)
1247            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1248    }
1249
1250    /// Look up or build an `ObjVecData` columnar representation for the given
1251    /// `Arc<Vec<Val>>` array, caching the result by pointer address.
1252    pub(crate) fn get_or_promote_objvec(
1253        &self,
1254        arr: &Arc<Vec<Val>>,
1255    ) -> Option<Arc<crate::data::value::ObjVecData>> {
1256        let key = Arc::as_ptr(arr) as usize;
1257        if let Ok(cache) = self.objvec_cache.lock() {
1258            if let Some(d) = cache.get(&key) {
1259                return Some(Arc::clone(d));
1260            }
1261        }
1262        let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
1263        if let Ok(mut cache) = self.objvec_cache.lock() {
1264            cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
1265        }
1266        Some(promoted)
1267    }
1268
1269    /// Internal constructor that wraps a `serde_json::Value` without raw bytes.
1270    pub(crate) fn new(document: Value) -> Self {
1271        Self {
1272            document,
1273            root_val: OnceCell::new(),
1274            objvec_cache: Default::default(),
1275            raw_bytes: None,
1276            tape: OnceCell::new(),
1277            structural_index: OnceCell::new(),
1278            vm: RefCell::new(VM::new()),
1279        }
1280    }
1281
1282    /// Build a `Jetro` whose `root_val` is pre-cached with `root` (constructed by the
1283    /// caller, typically via [`Val::from_value_with`] using an engine-owned key cache).
1284    /// `document` is retained for back-compat with non-`simd-json` callers and tests
1285    /// that read the original `serde_json::Value`.
1286    pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
1287        let root_val = OnceCell::new();
1288        let _ = root_val.set(root);
1289        Self {
1290            document,
1291            root_val,
1292            objvec_cache: Default::default(),
1293            raw_bytes: None,
1294            tape: OnceCell::new(),
1295            structural_index: OnceCell::new(),
1296            vm: RefCell::new(VM::new()),
1297        }
1298    }
1299
1300    /// Like [`Jetro::root_val`] but interns object keys through `keys` instead of the
1301    /// process-wide default. Used by [`JetroEngine::parse_bytes`] to materialise the
1302    /// `Val` tree once at parse time so subsequent `collect` calls find a populated
1303    /// `root_val` cache and skip re-interning.
1304    pub(crate) fn root_val_with(
1305        &self,
1306        keys: &crate::data::intern::KeyCache,
1307    ) -> std::result::Result<Val, EvalError> {
1308        if let Some(root) = self.root_val.get() {
1309            return Ok(root.clone());
1310        }
1311        let root = {
1312            #[cfg(feature = "simd-json")]
1313            {
1314                if let Some(tape) = self.lazy_tape()? {
1315                    Val::from_tape_data_with(keys, tape)
1316                } else {
1317                    Val::from_value_with(keys, &self.document)
1318                }
1319            }
1320            #[cfg(not(feature = "simd-json"))]
1321            {
1322                Val::from_value_with(keys, &self.document)
1323            }
1324        };
1325        let _ = self.root_val.set(root);
1326        Ok(self.root_val.get().expect("root val initialized").clone())
1327    }
1328
1329    /// Parse raw JSON bytes and build a `Jetro` query handle.
1330    /// When the `simd-json` feature is enabled the bytes are not parsed eagerly;
1331    /// the tape is built lazily on the first query that needs it.
1332    pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
1333        #[cfg(feature = "simd-json")]
1334        {
1335            return Ok(Self {
1336                document: Value::Null,
1337                root_val: OnceCell::new(),
1338                objvec_cache: Default::default(),
1339                raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1340                tape: OnceCell::new(),
1341                structural_index: OnceCell::new(),
1342                vm: RefCell::new(VM::new()),
1343            });
1344        }
1345        #[allow(unreachable_code)]
1346        {
1347            let document: Value = serde_json::from_slice(&bytes)?;
1348            Ok(Self {
1349                document,
1350                root_val: OnceCell::new(),
1351                objvec_cache: Default::default(),
1352                raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
1353                tape: OnceCell::new(),
1354                structural_index: OnceCell::new(),
1355                vm: RefCell::new(VM::new()),
1356            })
1357        }
1358    }
1359
1360    /// Borrow this document's VM cache, falling back to a temporary VM on re-entrant use.
1361    pub(crate) fn with_vm<F, R>(&self, f: F) -> R
1362    where
1363        F: FnOnce(&mut VM) -> R,
1364    {
1365        match self.vm.try_borrow_mut() {
1366            Ok(mut vm) => f(&mut vm),
1367            Err(_) => {
1368                let mut vm = VM::new();
1369                f(&mut vm)
1370            }
1371        }
1372    }
1373
1374    /// Return the raw JSON byte slice if this handle was constructed from bytes,
1375    /// or `None` if it was constructed from a `serde_json::Value`.
1376    pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
1377        self.raw_bytes.as_deref()
1378    }
1379
1380    /// Return a reference to the lazily built `StructuralIndex` for key-presence
1381    /// queries, constructing it from raw bytes on first access if available.
1382    pub(crate) fn lazy_structural_index(
1383        &self,
1384    ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
1385        if let Some(result) = self.structural_index.get() {
1386            return result
1387                .as_ref()
1388                .map(Some)
1389                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
1390        }
1391        let Some(raw) = self.raw_bytes.as_ref() else {
1392            return Ok(None);
1393        };
1394        let built = jetro_experimental::from_bytes_with(
1395            raw.as_ref(),
1396            jetro_experimental::BuildOptions::keys_only(),
1397        )
1398        .map(Arc::new)
1399        .map_err(|err| err.to_string());
1400        let _ = self.structural_index.set(built);
1401        self.structural_index
1402            .get()
1403            .expect("structural index cache initialized")
1404            .as_ref()
1405            .map(Some)
1406            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
1407    }
1408
1409    /// Return the root `Val` for the document, building and caching it from the
1410    /// tape (simd-json) or from the `serde_json::Value` on first access.
1411    pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
1412        if let Some(root) = self.root_val.get() {
1413            return Ok(root.clone());
1414        }
1415        let root = {
1416            #[cfg(feature = "simd-json")]
1417            {
1418                if let Some(tape) = self.lazy_tape()? {
1419                    Val::from_tape_data(tape)
1420                } else {
1421                    Val::from(&self.document)
1422                }
1423            }
1424            #[cfg(not(feature = "simd-json"))]
1425            {
1426                Val::from(&self.document)
1427            }
1428        };
1429        let _ = self.root_val.set(root);
1430        Ok(self.root_val.get().expect("root val initialized").clone())
1431    }
1432
1433    /// Return `true` if the `Val` tree has already been materialised; used in
1434    /// tests to assert that lazy evaluation is working correctly.
1435    #[cfg(test)]
1436    pub(crate) fn root_val_is_materialized(&self) -> bool {
1437        self.root_val.get().is_some()
1438    }
1439
1440    #[cfg(test)]
1441    pub(crate) fn structural_index_is_built(&self) -> bool {
1442        self.structural_index.get().is_some()
1443    }
1444
1445    #[cfg(all(test, feature = "simd-json"))]
1446    pub(crate) fn tape_is_built(&self) -> bool {
1447        self.tape.get().is_some()
1448    }
1449
1450    #[cfg(all(test, feature = "simd-json"))]
1451    pub(crate) fn reset_tape_materialized_subtrees(&self) {
1452        if let Ok(Some(tape)) = self.lazy_tape() {
1453            tape.reset_materialized_subtrees();
1454        }
1455    }
1456
1457    #[cfg(all(test, feature = "simd-json"))]
1458    pub(crate) fn tape_materialized_subtrees(&self) -> usize {
1459        self.lazy_tape()
1460            .ok()
1461            .flatten()
1462            .map(|tape| tape.materialized_subtrees())
1463            .unwrap_or(0)
1464    }
1465
1466    /// Evaluate a Jetro expression against this document and return the result
1467    /// as a `serde_json::Value`. Uses this document's VM with compile and
1468    /// path-resolution caches for repeated calls.
1469    pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
1470        exec::router::collect_json(self, expr.as_ref())
1471    }
1472}
1473
1474/// Wrap an existing `serde_json::Value` in a `Jetro` handle without raw bytes.
1475/// Prefer `Jetro::from_bytes` when you have the original JSON source, as it
1476/// enables the tape and structural-index lazy backends.
1477impl From<Value> for Jetro {
1478    /// Convert a `serde_json::Value` into a `Jetro` query handle.
1479    fn from(v: Value) -> Self {
1480        Self::new(v)
1481    }
1482}