Skip to main content

jetro_core/
lib.rs

1//! Jetro core — parser, compiler, and VM for the Jetro JSON query language.
2//!
3//! # Execution path
4//!
5//! ```text
6//! source text
7//!   │  parse::parser::parse() → Expr AST
8//!   │  plan::physical::plan_query() → QueryPlan (physical IR)
9//!   │  exec::router::collect_*() → dispatches to:
10//!   │    StructuralIndex backend  (jetro-experimental bitmap)
11//!   │    ViewPipeline backend     (borrowed tape/Val navigation)
12//!   │    Pipeline backend         (pull-based composed stages)
13//!   └─  VM fallback               (bytecode stack machine)
14//! ```
15//!
16//! # Quick start
17//!
18//! ```rust
19//! use jetro_core::Jetro;
20//! let j = Jetro::from_bytes(br#"{"books":[{"price":12}]}"#.to_vec()).unwrap();
21//! assert_eq!(j.collect("$.books.len()").unwrap(), serde_json::json!(1));
22//! ```
23
24pub(crate) mod builtins;
25pub(crate) mod compile;
26pub(crate) mod data;
27pub(crate) mod exec;
28pub(crate) mod ir;
29pub(crate) mod parse;
30pub(crate) mod plan;
31pub(crate) mod util;
32pub(crate) mod vm;
33
34#[cfg(test)]
35mod tests;
36
37use serde_json::Value;
38use std::cell::{OnceCell, RefCell};
39use std::collections::HashMap;
40use std::sync::Arc;
41use std::sync::Mutex;
42use data::value::Val;
43
44pub use data::context::EvalError;
45#[cfg(test)]
46use parse::parser::ParseError;
47use vm::VM;
48
49/// Internal parser surface re-exported only when the `fuzz_internal` feature
50/// is enabled. Used by the `cargo-fuzz` harness to reach the PEG parser
51/// without going through `Jetro::collect`. NOT a stable public API.
52#[cfg(feature = "fuzz_internal")]
53pub mod __fuzz_internal {
54    pub use crate::parse::parser::{parse, ParseError};
55    pub use crate::plan::physical::plan_query;
56}
57
58
59#[cfg(test)]
60#[derive(Debug)]
61pub(crate) enum Error {
62    Parse(ParseError),
63    Eval(EvalError),
64}
65
66#[cfg(test)]
67impl std::fmt::Display for Error {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        match self {
70            Error::Parse(e) => write!(f, "{}", e),
71            Error::Eval(e) => write!(f, "{}", e),
72        }
73    }
74}
75#[cfg(test)]
76impl std::error::Error for Error {
77    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
78        match self {
79            Error::Parse(e) => Some(e),
80            Error::Eval(_) => None,
81        }
82    }
83}
84
85#[cfg(test)]
86impl From<ParseError> for Error {
87    fn from(e: ParseError) -> Self {
88        Error::Parse(e)
89    }
90}
91#[cfg(test)]
92impl From<EvalError> for Error {
93    fn from(e: EvalError) -> Self {
94        Error::Eval(e)
95    }
96}
97
98
99// Thread-local VM, constructed lazily on first `collect()` call.
100// Thread-local avoids a Mutex and lets compile/path caches accumulate.
101thread_local! {
102    static THREAD_VM: OnceCell<RefCell<VM>> = const { OnceCell::new() };
103}
104
105/// Borrow the thread-local `VM`, constructing it on first access.
106/// All `Jetro::collect` calls on the same thread share one `VM` so that
107/// compile and path-resolution caches accumulate across queries.
108fn with_vm<F, R>(f: F) -> R
109where
110    F: FnOnce(&RefCell<VM>) -> R,
111{
112    THREAD_VM.with(|cell| {
113        let inner = cell.get_or_init(|| RefCell::new(VM::new()));
114        f(inner)
115    })
116}
117
118
119/// Primary entry point. Holds a JSON document and evaluates expressions against
120/// it. Lazy fields (`root_val`, `tape`, `structural_index`, `objvec_cache`)
121/// are populated on first use so callers only pay for the representations a
122/// particular query actually needs.
123pub struct Jetro {
124    /// The `serde_json::Value` root document; unused when `simd-json` is enabled
125    /// (the tape is the authoritative source in that case).
126    document: Value,
127    /// Cached `Val` tree — built once and reused across `collect()` calls.
128    root_val: OnceCell<Val>,
129    /// Retained raw bytes for lazy tape and structural-index materialisation.
130    raw_bytes: Option<Arc<[u8]>>,
131
132    /// Lazily parsed simd-json tape; `Err` is cached to avoid re-parsing after failure.
133    #[cfg(feature = "simd-json")]
134    tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
135    /// Unused placeholder so the field name is consistent regardless of features.
136    #[cfg(not(feature = "simd-json"))]
137    #[allow(dead_code)]
138    tape: OnceCell<()>,
139
140    /// Lazily built bitmap structural index for accelerated key-presence queries.
141    structural_index:
142        OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
143
144    /// Per-document cache from `Arc<Vec<Val>>` pointer addresses to promoted
145    /// `ObjVecData` columnar representations; keyed by pointer to avoid re-promotion.
146    pub(crate) objvec_cache:
147        std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
148}
149
150
151/// Long-lived multi-document query engine with an explicit plan cache.
152/// Use when the same process evaluates many expressions over many documents —
153/// parse/lower/compile work is amortised by this object, not hidden in
154/// thread-local state.
155pub struct JetroEngine {
156    /// Maps `"<context_key>\0<expr>"` to compiled `QueryPlan`; evicted wholesale when full.
157    plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
158    /// Maximum number of entries before the cache is cleared; 0 disables caching.
159    plan_cache_limit: usize,
160    /// The shared `VM` used by all `collect*` calls on this engine instance.
161    vm: Mutex<VM>,
162}
163
164/// Error returned by `JetroEngine::collect_bytes` and similar methods that
165/// may fail during JSON parsing or during expression evaluation.
166#[derive(Debug)]
167pub enum JetroEngineError {
168    /// JSON parsing failed before evaluation could begin.
169    Json(serde_json::Error),
170    /// Expression evaluation failed (the JSON was valid but the query errored).
171    Eval(EvalError),
172}
173
174impl std::fmt::Display for JetroEngineError {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        match self {
177            Self::Json(err) => write!(f, "{}", err),
178            Self::Eval(err) => write!(f, "{}", err),
179        }
180    }
181}
182
183impl std::error::Error for JetroEngineError {
184    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
185        match self {
186            Self::Json(err) => Some(err),
187            Self::Eval(_) => None,
188        }
189    }
190}
191
192impl From<serde_json::Error> for JetroEngineError {
193    fn from(err: serde_json::Error) -> Self {
194        Self::Json(err)
195    }
196}
197
198impl From<EvalError> for JetroEngineError {
199    fn from(err: EvalError) -> Self {
200        Self::Eval(err)
201    }
202}
203
204impl Default for JetroEngine {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210impl JetroEngine {
211    /// Default maximum plan-cache size; the cache is cleared wholesale when reached.
212    const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
213
214    /// Create a `JetroEngine` with the default plan-cache limit of 256 entries.
215    pub fn new() -> Self {
216        Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
217    }
218
219    /// Create a `JetroEngine` with an explicit plan-cache capacity.
220    /// Set `plan_cache_limit` to 0 to disable caching entirely.
221    pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
222        Self {
223            plan_cache: Mutex::new(HashMap::new()),
224            plan_cache_limit,
225            vm: Mutex::new(VM::new()),
226        }
227    }
228
229    /// Discard all cached query plans, forcing re-compilation on the next call.
230    pub fn clear_cache(&self) {
231        self.plan_cache.lock().expect("plan cache poisoned").clear();
232    }
233
234    /// Evaluate a Jetro expression against an already-constructed `Jetro` document,
235    /// using the engine's shared plan cache and `VM`.
236    pub fn collect<S: AsRef<str>>(
237        &self,
238        document: &Jetro,
239        expr: S,
240    ) -> std::result::Result<Value, EvalError> {
241        let plan = self.cached_plan(expr.as_ref(), exec::router::planning_context(document));
242        let mut vm = self.vm.lock().expect("vm cache poisoned");
243        exec::router::collect_plan_json_with_vm(document, &plan, &mut vm)
244    }
245
246    /// Convenience wrapper: wrap a `serde_json::Value` in a `Jetro` and evaluate `expr`.
247    pub fn collect_value<S: AsRef<str>>(
248        &self,
249        document: Value,
250        expr: S,
251    ) -> std::result::Result<Value, EvalError> {
252        let document = Jetro::from(document);
253        self.collect(&document, expr)
254    }
255
256    /// Parse raw JSON bytes into a `Jetro` document and evaluate `expr`,
257    /// returning a `JetroEngineError` on either parse or evaluation failure.
258    pub fn collect_bytes<S: AsRef<str>>(
259        &self,
260        bytes: Vec<u8>,
261        expr: S,
262    ) -> std::result::Result<Value, JetroEngineError> {
263        let document = Jetro::from_bytes(bytes)?;
264        Ok(self.collect(&document, expr)?)
265    }
266
267    /// Look up a compiled `QueryPlan` by expression string and planning context,
268    /// compiling and inserting it if not already cached; evicts the whole cache if full.
269    fn cached_plan(&self, expr: &str, context: plan::physical::PlanningContext) -> ir::physical::QueryPlan {
270        let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
271        let cache_key = format!("{}\0{}", context.cache_key(), expr);
272        if let Some(plan) = cache.get(&cache_key) {
273            return plan.clone();
274        }
275
276        let plan = plan::physical::plan_query_with_context(expr, context);
277        if self.plan_cache_limit > 0 {
278            if cache.len() >= self.plan_cache_limit {
279                cache.clear();
280            }
281            cache.insert(cache_key, plan.clone());
282        }
283        plan
284    }
285}
286
287impl exec::pipeline::PipelineData for Jetro {
288    fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
289        self.get_or_promote_objvec(arr)
290    }
291}
292
293impl Jetro {
294    /// Return a reference to the lazily parsed simd-json `TapeData`, parsing raw bytes
295    /// on first access. Returns `Ok(None)` when no raw bytes are stored.
296    #[cfg(feature = "simd-json")]
297    pub(crate) fn lazy_tape(
298        &self,
299    ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
300        if let Some(result) = self.tape.get() {
301            return result
302                .as_ref()
303                .map(Some)
304                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
305        }
306        let Some(raw) = self.raw_bytes.as_ref() else {
307            return Ok(None);
308        };
309        let bytes: Vec<u8> = (**raw).to_vec();
310        let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
311        let _ = self.tape.set(parsed);
312        self.tape
313            .get()
314            .expect("tape cache initialized")
315            .as_ref()
316            .map(Some)
317            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
318    }
319
320    /// Look up or build an `ObjVecData` columnar representation for the given
321    /// `Arc<Vec<Val>>` array, caching the result by pointer address.
322    pub(crate) fn get_or_promote_objvec(
323        &self,
324        arr: &Arc<Vec<Val>>,
325    ) -> Option<Arc<crate::data::value::ObjVecData>> {
326        let key = Arc::as_ptr(arr) as usize;
327        if let Ok(cache) = self.objvec_cache.lock() {
328            if let Some(d) = cache.get(&key) {
329                return Some(Arc::clone(d));
330            }
331        }
332        let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
333        if let Ok(mut cache) = self.objvec_cache.lock() {
334            cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
335        }
336        Some(promoted)
337    }
338
339    /// Internal constructor that wraps a `serde_json::Value` without raw bytes.
340    pub(crate) fn new(document: Value) -> Self {
341        Self {
342            document,
343            root_val: OnceCell::new(),
344            objvec_cache: Default::default(),
345            raw_bytes: None,
346            tape: OnceCell::new(),
347            structural_index: OnceCell::new(),
348        }
349    }
350
351    /// Parse raw JSON bytes and build a `Jetro` query handle.
352    /// When the `simd-json` feature is enabled the bytes are not parsed eagerly;
353    /// the tape is built lazily on the first query that needs it.
354    pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
355        
356        
357        #[cfg(feature = "simd-json")]
358        {
359            return Ok(Self {
360                document: Value::Null,
361                root_val: OnceCell::new(),
362                objvec_cache: Default::default(),
363                raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
364                tape: OnceCell::new(),
365                structural_index: OnceCell::new(),
366            });
367        }
368        #[allow(unreachable_code)]
369        {
370            let document: Value = serde_json::from_slice(&bytes)?;
371            Ok(Self {
372                document,
373                root_val: OnceCell::new(),
374                objvec_cache: Default::default(),
375                raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
376                tape: OnceCell::new(),
377                structural_index: OnceCell::new(),
378            })
379        }
380    }
381
382    /// Return the raw JSON byte slice if this handle was constructed from bytes,
383    /// or `None` if it was constructed from a `serde_json::Value`.
384    pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
385        self.raw_bytes.as_deref()
386    }
387
388    /// Return a reference to the lazily built `StructuralIndex` for key-presence
389    /// queries, constructing it from raw bytes on first access if available.
390    pub(crate) fn lazy_structural_index(
391        &self,
392    ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
393        if let Some(result) = self.structural_index.get() {
394            return result
395                .as_ref()
396                .map(Some)
397                .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
398        }
399        let Some(raw) = self.raw_bytes.as_ref() else {
400            return Ok(None);
401        };
402        let built = jetro_experimental::from_bytes_with(
403            raw.as_ref(),
404            jetro_experimental::BuildOptions::keys_only(),
405        )
406        .map(Arc::new)
407        .map_err(|err| err.to_string());
408        let _ = self.structural_index.set(built);
409        self.structural_index
410            .get()
411            .expect("structural index cache initialized")
412            .as_ref()
413            .map(Some)
414            .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
415    }
416
417    /// Return the root `Val` for the document, building and caching it from the
418    /// tape (simd-json) or from the `serde_json::Value` on first access.
419    pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
420        if let Some(root) = self.root_val.get() {
421            return Ok(root.clone());
422        }
423        let root = {
424            #[cfg(feature = "simd-json")]
425            {
426                if let Some(tape) = self.lazy_tape()? {
427                    Val::from_tape_data(tape)
428                } else {
429                    Val::from(&self.document)
430                }
431            }
432            #[cfg(not(feature = "simd-json"))]
433            {
434                Val::from(&self.document)
435            }
436        };
437        let _ = self.root_val.set(root);
438        Ok(self.root_val.get().expect("root val initialized").clone())
439    }
440
441    /// Return `true` if the `Val` tree has already been materialised; used in
442    /// tests to assert that lazy evaluation is working correctly.
443    #[cfg(test)]
444    pub(crate) fn root_val_is_materialized(&self) -> bool {
445        self.root_val.get().is_some()
446    }
447
448    #[cfg(test)]
449    pub(crate) fn structural_index_is_built(&self) -> bool {
450        self.structural_index.get().is_some()
451    }
452
453    #[cfg(all(test, feature = "simd-json"))]
454    pub(crate) fn tape_is_built(&self) -> bool {
455        self.tape.get().is_some()
456    }
457
458    #[cfg(all(test, feature = "simd-json"))]
459    pub(crate) fn reset_tape_materialized_subtrees(&self) {
460        if let Ok(Some(tape)) = self.lazy_tape() {
461            tape.reset_materialized_subtrees();
462        }
463    }
464
465    #[cfg(all(test, feature = "simd-json"))]
466    pub(crate) fn tape_materialized_subtrees(&self) -> usize {
467        self.lazy_tape()
468            .ok()
469            .flatten()
470            .map(|tape| tape.materialized_subtrees())
471            .unwrap_or(0)
472    }
473
474    /// Evaluate a Jetro expression against this document and return the result
475    /// as a `serde_json::Value`. Uses the thread-local `VM` with compile and
476    /// path-resolution caches for repeated calls.
477    pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
478        exec::router::collect_json(self, expr.as_ref())
479    }
480}
481
482/// Wrap an existing `serde_json::Value` in a `Jetro` handle without raw bytes.
483/// Prefer `Jetro::from_bytes` when you have the original JSON source, as it
484/// enables the tape and structural-index lazy backends.
485impl From<Value> for Jetro {
486    /// Convert a `serde_json::Value` into a `Jetro` query handle.
487    fn from(v: Value) -> Self {
488        Self::new(v)
489    }
490}