1pub(crate) mod builtins;
25pub(crate) mod compile;
26pub(crate) mod data;
27pub(crate) mod exec;
28pub(crate) mod ir;
29pub(crate) mod parse;
30pub(crate) mod plan;
31pub(crate) mod util;
32pub(crate) mod vm;
33
34#[cfg(test)]
35mod tests;
36
37use serde_json::Value;
38use std::cell::{OnceCell, RefCell};
39use std::collections::HashMap;
40use std::sync::Arc;
41use std::sync::Mutex;
42use data::value::Val;
43
44pub use data::context::EvalError;
45#[cfg(test)]
46use parse::parser::ParseError;
47use vm::VM;
48
49#[cfg(feature = "fuzz_internal")]
53pub mod __fuzz_internal {
54 pub use crate::parse::parser::{parse, ParseError};
55 pub use crate::plan::physical::plan_query;
56}
57
58
59#[cfg(test)]
60#[derive(Debug)]
61pub(crate) enum Error {
62 Parse(ParseError),
63 Eval(EvalError),
64}
65
66#[cfg(test)]
67impl std::fmt::Display for Error {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 match self {
70 Error::Parse(e) => write!(f, "{}", e),
71 Error::Eval(e) => write!(f, "{}", e),
72 }
73 }
74}
75#[cfg(test)]
76impl std::error::Error for Error {
77 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
78 match self {
79 Error::Parse(e) => Some(e),
80 Error::Eval(_) => None,
81 }
82 }
83}
84
85#[cfg(test)]
86impl From<ParseError> for Error {
87 fn from(e: ParseError) -> Self {
88 Error::Parse(e)
89 }
90}
91#[cfg(test)]
92impl From<EvalError> for Error {
93 fn from(e: EvalError) -> Self {
94 Error::Eval(e)
95 }
96}
97
98
99thread_local! {
102 static THREAD_VM: OnceCell<RefCell<VM>> = const { OnceCell::new() };
103}
104
105fn with_vm<F, R>(f: F) -> R
109where
110 F: FnOnce(&RefCell<VM>) -> R,
111{
112 THREAD_VM.with(|cell| {
113 let inner = cell.get_or_init(|| RefCell::new(VM::new()));
114 f(inner)
115 })
116}
117
118
119pub struct Jetro {
124 document: Value,
127 root_val: OnceCell<Val>,
129 raw_bytes: Option<Arc<[u8]>>,
131
132 #[cfg(feature = "simd-json")]
134 tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
135 #[cfg(not(feature = "simd-json"))]
137 #[allow(dead_code)]
138 tape: OnceCell<()>,
139
140 structural_index:
142 OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
143
144 pub(crate) objvec_cache:
147 std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
148}
149
150
151pub struct JetroEngine {
156 plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
158 plan_cache_limit: usize,
160 vm: Mutex<VM>,
162}
163
164#[derive(Debug)]
167pub enum JetroEngineError {
168 Json(serde_json::Error),
170 Eval(EvalError),
172}
173
174impl std::fmt::Display for JetroEngineError {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 match self {
177 Self::Json(err) => write!(f, "{}", err),
178 Self::Eval(err) => write!(f, "{}", err),
179 }
180 }
181}
182
183impl std::error::Error for JetroEngineError {
184 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
185 match self {
186 Self::Json(err) => Some(err),
187 Self::Eval(_) => None,
188 }
189 }
190}
191
192impl From<serde_json::Error> for JetroEngineError {
193 fn from(err: serde_json::Error) -> Self {
194 Self::Json(err)
195 }
196}
197
198impl From<EvalError> for JetroEngineError {
199 fn from(err: EvalError) -> Self {
200 Self::Eval(err)
201 }
202}
203
204impl Default for JetroEngine {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210impl JetroEngine {
211 const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
213
214 pub fn new() -> Self {
216 Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
217 }
218
219 pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
222 Self {
223 plan_cache: Mutex::new(HashMap::new()),
224 plan_cache_limit,
225 vm: Mutex::new(VM::new()),
226 }
227 }
228
229 pub fn clear_cache(&self) {
231 self.plan_cache.lock().expect("plan cache poisoned").clear();
232 }
233
234 pub fn collect<S: AsRef<str>>(
237 &self,
238 document: &Jetro,
239 expr: S,
240 ) -> std::result::Result<Value, EvalError> {
241 let plan = self.cached_plan(expr.as_ref(), exec::router::planning_context(document));
242 let mut vm = self.vm.lock().expect("vm cache poisoned");
243 exec::router::collect_plan_json_with_vm(document, &plan, &mut vm)
244 }
245
246 pub fn collect_value<S: AsRef<str>>(
248 &self,
249 document: Value,
250 expr: S,
251 ) -> std::result::Result<Value, EvalError> {
252 let document = Jetro::from(document);
253 self.collect(&document, expr)
254 }
255
256 pub fn collect_bytes<S: AsRef<str>>(
259 &self,
260 bytes: Vec<u8>,
261 expr: S,
262 ) -> std::result::Result<Value, JetroEngineError> {
263 let document = Jetro::from_bytes(bytes)?;
264 Ok(self.collect(&document, expr)?)
265 }
266
267 fn cached_plan(&self, expr: &str, context: plan::physical::PlanningContext) -> ir::physical::QueryPlan {
270 let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
271 let cache_key = format!("{}\0{}", context.cache_key(), expr);
272 if let Some(plan) = cache.get(&cache_key) {
273 return plan.clone();
274 }
275
276 let plan = plan::physical::plan_query_with_context(expr, context);
277 if self.plan_cache_limit > 0 {
278 if cache.len() >= self.plan_cache_limit {
279 cache.clear();
280 }
281 cache.insert(cache_key, plan.clone());
282 }
283 plan
284 }
285}
286
287impl exec::pipeline::PipelineData for Jetro {
288 fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
289 self.get_or_promote_objvec(arr)
290 }
291}
292
293impl Jetro {
294 #[cfg(feature = "simd-json")]
297 pub(crate) fn lazy_tape(
298 &self,
299 ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
300 if let Some(result) = self.tape.get() {
301 return result
302 .as_ref()
303 .map(Some)
304 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
305 }
306 let Some(raw) = self.raw_bytes.as_ref() else {
307 return Ok(None);
308 };
309 let bytes: Vec<u8> = (**raw).to_vec();
310 let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
311 let _ = self.tape.set(parsed);
312 self.tape
313 .get()
314 .expect("tape cache initialized")
315 .as_ref()
316 .map(Some)
317 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
318 }
319
320 pub(crate) fn get_or_promote_objvec(
323 &self,
324 arr: &Arc<Vec<Val>>,
325 ) -> Option<Arc<crate::data::value::ObjVecData>> {
326 let key = Arc::as_ptr(arr) as usize;
327 if let Ok(cache) = self.objvec_cache.lock() {
328 if let Some(d) = cache.get(&key) {
329 return Some(Arc::clone(d));
330 }
331 }
332 let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
333 if let Ok(mut cache) = self.objvec_cache.lock() {
334 cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
335 }
336 Some(promoted)
337 }
338
339 pub(crate) fn new(document: Value) -> Self {
341 Self {
342 document,
343 root_val: OnceCell::new(),
344 objvec_cache: Default::default(),
345 raw_bytes: None,
346 tape: OnceCell::new(),
347 structural_index: OnceCell::new(),
348 }
349 }
350
351 pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
355
356
357 #[cfg(feature = "simd-json")]
358 {
359 return Ok(Self {
360 document: Value::Null,
361 root_val: OnceCell::new(),
362 objvec_cache: Default::default(),
363 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
364 tape: OnceCell::new(),
365 structural_index: OnceCell::new(),
366 });
367 }
368 #[allow(unreachable_code)]
369 {
370 let document: Value = serde_json::from_slice(&bytes)?;
371 Ok(Self {
372 document,
373 root_val: OnceCell::new(),
374 objvec_cache: Default::default(),
375 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
376 tape: OnceCell::new(),
377 structural_index: OnceCell::new(),
378 })
379 }
380 }
381
382 pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
385 self.raw_bytes.as_deref()
386 }
387
388 pub(crate) fn lazy_structural_index(
391 &self,
392 ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
393 if let Some(result) = self.structural_index.get() {
394 return result
395 .as_ref()
396 .map(Some)
397 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
398 }
399 let Some(raw) = self.raw_bytes.as_ref() else {
400 return Ok(None);
401 };
402 let built = jetro_experimental::from_bytes_with(
403 raw.as_ref(),
404 jetro_experimental::BuildOptions::keys_only(),
405 )
406 .map(Arc::new)
407 .map_err(|err| err.to_string());
408 let _ = self.structural_index.set(built);
409 self.structural_index
410 .get()
411 .expect("structural index cache initialized")
412 .as_ref()
413 .map(Some)
414 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
415 }
416
417 pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
420 if let Some(root) = self.root_val.get() {
421 return Ok(root.clone());
422 }
423 let root = {
424 #[cfg(feature = "simd-json")]
425 {
426 if let Some(tape) = self.lazy_tape()? {
427 Val::from_tape_data(tape)
428 } else {
429 Val::from(&self.document)
430 }
431 }
432 #[cfg(not(feature = "simd-json"))]
433 {
434 Val::from(&self.document)
435 }
436 };
437 let _ = self.root_val.set(root);
438 Ok(self.root_val.get().expect("root val initialized").clone())
439 }
440
441 #[cfg(test)]
444 pub(crate) fn root_val_is_materialized(&self) -> bool {
445 self.root_val.get().is_some()
446 }
447
448 #[cfg(test)]
449 pub(crate) fn structural_index_is_built(&self) -> bool {
450 self.structural_index.get().is_some()
451 }
452
453 #[cfg(all(test, feature = "simd-json"))]
454 pub(crate) fn tape_is_built(&self) -> bool {
455 self.tape.get().is_some()
456 }
457
458 #[cfg(all(test, feature = "simd-json"))]
459 pub(crate) fn reset_tape_materialized_subtrees(&self) {
460 if let Ok(Some(tape)) = self.lazy_tape() {
461 tape.reset_materialized_subtrees();
462 }
463 }
464
465 #[cfg(all(test, feature = "simd-json"))]
466 pub(crate) fn tape_materialized_subtrees(&self) -> usize {
467 self.lazy_tape()
468 .ok()
469 .flatten()
470 .map(|tape| tape.materialized_subtrees())
471 .unwrap_or(0)
472 }
473
474 pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
478 exec::router::collect_json(self, expr.as_ref())
479 }
480}
481
482impl From<Value> for Jetro {
486 fn from(v: Value) -> Self {
488 Self::new(v)
489 }
490}