1pub(crate) mod builtins;
25pub(crate) mod compile;
26pub(crate) mod data;
27pub(crate) mod exec;
28pub(crate) mod ir;
29pub(crate) mod parse;
30pub(crate) mod plan;
31pub(crate) mod util;
32pub(crate) mod vm;
33
34#[cfg(test)]
35mod tests;
36
37use serde_json::Value;
38use std::cell::{OnceCell, RefCell};
39use std::collections::HashMap;
40use std::sync::Arc;
41use std::sync::Mutex;
42use data::value::Val;
43
44pub use data::context::EvalError;
45#[cfg(test)]
46use parse::parser::ParseError;
47use vm::VM;
48
49#[cfg(feature = "fuzz_internal")]
53pub mod __fuzz_internal {
54 pub use crate::parse::parser::{parse, ParseError};
55 pub use crate::plan::physical::plan_query;
56}
57
58
59#[cfg(test)]
60#[derive(Debug)]
61pub(crate) enum Error {
62 Parse(ParseError),
63 Eval(EvalError),
64}
65
66#[cfg(test)]
67impl std::fmt::Display for Error {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 match self {
70 Error::Parse(e) => write!(f, "{}", e),
71 Error::Eval(e) => write!(f, "{}", e),
72 }
73 }
74}
75#[cfg(test)]
76impl std::error::Error for Error {
77 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
78 match self {
79 Error::Parse(e) => Some(e),
80 Error::Eval(_) => None,
81 }
82 }
83}
84
85#[cfg(test)]
86impl From<ParseError> for Error {
87 fn from(e: ParseError) -> Self {
88 Error::Parse(e)
89 }
90}
91#[cfg(test)]
92impl From<EvalError> for Error {
93 fn from(e: EvalError) -> Self {
94 Error::Eval(e)
95 }
96}
97
98
99thread_local! {
102 static THREAD_VM: OnceCell<RefCell<VM>> = const { OnceCell::new() };
103}
104
105fn with_vm<F, R>(f: F) -> R
109where
110 F: FnOnce(&RefCell<VM>) -> R,
111{
112 THREAD_VM.with(|cell| {
113 let inner = cell.get_or_init(|| RefCell::new(VM::new()));
114 f(inner)
115 })
116}
117
118
119pub struct Jetro {
124 document: Value,
127 root_val: OnceCell<Val>,
129 raw_bytes: Option<Arc<[u8]>>,
131
132 #[cfg(feature = "simd-json")]
134 tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
135 #[cfg(not(feature = "simd-json"))]
137 #[allow(dead_code)]
138 tape: OnceCell<()>,
139
140 structural_index:
142 OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
143
144 pub(crate) objvec_cache:
147 std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
148}
149
150
151pub struct JetroEngine {
156 plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
158 plan_cache_limit: usize,
160 vm: Mutex<VM>,
162 keys: Arc<crate::data::intern::KeyCache>,
168}
169
170#[derive(Debug)]
173pub enum JetroEngineError {
174 Json(serde_json::Error),
176 Eval(EvalError),
178}
179
180impl std::fmt::Display for JetroEngineError {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 match self {
183 Self::Json(err) => write!(f, "{}", err),
184 Self::Eval(err) => write!(f, "{}", err),
185 }
186 }
187}
188
189impl std::error::Error for JetroEngineError {
190 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
191 match self {
192 Self::Json(err) => Some(err),
193 Self::Eval(_) => None,
194 }
195 }
196}
197
198impl From<serde_json::Error> for JetroEngineError {
199 fn from(err: serde_json::Error) -> Self {
200 Self::Json(err)
201 }
202}
203
204impl From<EvalError> for JetroEngineError {
205 fn from(err: EvalError) -> Self {
206 Self::Eval(err)
207 }
208}
209
210impl Default for JetroEngine {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216impl JetroEngine {
217 const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
219
220 pub fn new() -> Self {
222 Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
223 }
224
225 pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
228 Self {
229 plan_cache: Mutex::new(HashMap::new()),
230 plan_cache_limit,
231 vm: Mutex::new(VM::new()),
232 keys: crate::data::intern::KeyCache::new(),
233 }
234 }
235
236 pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
238 &self.keys
239 }
240
241 pub fn clear_cache(&self) {
244 self.plan_cache.lock().expect("plan cache poisoned").clear();
245 self.keys.clear();
246 }
247
248 pub fn parse_value(&self, document: Value) -> Jetro {
253 let root = Val::from_value_with(&self.keys, &document);
254 Jetro::from_val_and_value(root, document)
255 }
256
257 pub fn parse_bytes(
262 &self,
263 bytes: Vec<u8>,
264 ) -> std::result::Result<Jetro, JetroEngineError> {
265 let document = Jetro::from_bytes(bytes)?;
266 let _ = document.root_val_with(&self.keys)?;
270 Ok(document)
271 }
272
273 pub fn collect<S: AsRef<str>>(
276 &self,
277 document: &Jetro,
278 expr: S,
279 ) -> std::result::Result<Value, EvalError> {
280 let plan = self.cached_plan(expr.as_ref(), exec::router::planning_context(document));
281 let mut vm = self.vm.lock().expect("vm cache poisoned");
282 exec::router::collect_plan_json_with_vm(document, &plan, &mut vm)
283 }
284
285 pub fn collect_value<S: AsRef<str>>(
289 &self,
290 document: Value,
291 expr: S,
292 ) -> std::result::Result<Value, EvalError> {
293 let document = self.parse_value(document);
294 self.collect(&document, expr)
295 }
296
297 pub fn collect_bytes<S: AsRef<str>>(
302 &self,
303 bytes: Vec<u8>,
304 expr: S,
305 ) -> std::result::Result<Value, JetroEngineError> {
306 let document = self.parse_bytes(bytes)?;
307 Ok(self.collect(&document, expr)?)
308 }
309
310 fn cached_plan(&self, expr: &str, context: plan::physical::PlanningContext) -> ir::physical::QueryPlan {
313 let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
314 let cache_key = format!("{}\0{}", context.cache_key(), expr);
315 if let Some(plan) = cache.get(&cache_key) {
316 return plan.clone();
317 }
318
319 let plan = plan::physical::plan_query_with_context(expr, context);
320 if self.plan_cache_limit > 0 {
321 if cache.len() >= self.plan_cache_limit {
322 cache.clear();
323 }
324 cache.insert(cache_key, plan.clone());
325 }
326 plan
327 }
328}
329
330impl exec::pipeline::PipelineData for Jetro {
331 fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
332 self.get_or_promote_objvec(arr)
333 }
334}
335
336impl Jetro {
337 #[cfg(feature = "simd-json")]
340 pub(crate) fn lazy_tape(
341 &self,
342 ) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
343 if let Some(result) = self.tape.get() {
344 return result
345 .as_ref()
346 .map(Some)
347 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
348 }
349 let Some(raw) = self.raw_bytes.as_ref() else {
350 return Ok(None);
351 };
352 let bytes: Vec<u8> = (**raw).to_vec();
353 let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
354 let _ = self.tape.set(parsed);
355 self.tape
356 .get()
357 .expect("tape cache initialized")
358 .as_ref()
359 .map(Some)
360 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
361 }
362
363 pub(crate) fn get_or_promote_objvec(
366 &self,
367 arr: &Arc<Vec<Val>>,
368 ) -> Option<Arc<crate::data::value::ObjVecData>> {
369 let key = Arc::as_ptr(arr) as usize;
370 if let Ok(cache) = self.objvec_cache.lock() {
371 if let Some(d) = cache.get(&key) {
372 return Some(Arc::clone(d));
373 }
374 }
375 let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
376 if let Ok(mut cache) = self.objvec_cache.lock() {
377 cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
378 }
379 Some(promoted)
380 }
381
382 pub(crate) fn new(document: Value) -> Self {
384 Self {
385 document,
386 root_val: OnceCell::new(),
387 objvec_cache: Default::default(),
388 raw_bytes: None,
389 tape: OnceCell::new(),
390 structural_index: OnceCell::new(),
391 }
392 }
393
394 pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
399 let root_val = OnceCell::new();
400 let _ = root_val.set(root);
401 Self {
402 document,
403 root_val,
404 objvec_cache: Default::default(),
405 raw_bytes: None,
406 tape: OnceCell::new(),
407 structural_index: OnceCell::new(),
408 }
409 }
410
411 pub(crate) fn root_val_with(
416 &self,
417 keys: &crate::data::intern::KeyCache,
418 ) -> std::result::Result<Val, EvalError> {
419 if let Some(root) = self.root_val.get() {
420 return Ok(root.clone());
421 }
422 let root = {
423 #[cfg(feature = "simd-json")]
424 {
425 if let Some(tape) = self.lazy_tape()? {
426 Val::from_tape_data_with(keys, tape)
427 } else {
428 Val::from_value_with(keys, &self.document)
429 }
430 }
431 #[cfg(not(feature = "simd-json"))]
432 {
433 Val::from_value_with(keys, &self.document)
434 }
435 };
436 let _ = self.root_val.set(root);
437 Ok(self.root_val.get().expect("root val initialized").clone())
438 }
439
440 pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
444
445
446 #[cfg(feature = "simd-json")]
447 {
448 return Ok(Self {
449 document: Value::Null,
450 root_val: OnceCell::new(),
451 objvec_cache: Default::default(),
452 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
453 tape: OnceCell::new(),
454 structural_index: OnceCell::new(),
455 });
456 }
457 #[allow(unreachable_code)]
458 {
459 let document: Value = serde_json::from_slice(&bytes)?;
460 Ok(Self {
461 document,
462 root_val: OnceCell::new(),
463 objvec_cache: Default::default(),
464 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
465 tape: OnceCell::new(),
466 structural_index: OnceCell::new(),
467 })
468 }
469 }
470
471 pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
474 self.raw_bytes.as_deref()
475 }
476
477 pub(crate) fn lazy_structural_index(
480 &self,
481 ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
482 if let Some(result) = self.structural_index.get() {
483 return result
484 .as_ref()
485 .map(Some)
486 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
487 }
488 let Some(raw) = self.raw_bytes.as_ref() else {
489 return Ok(None);
490 };
491 let built = jetro_experimental::from_bytes_with(
492 raw.as_ref(),
493 jetro_experimental::BuildOptions::keys_only(),
494 )
495 .map(Arc::new)
496 .map_err(|err| err.to_string());
497 let _ = self.structural_index.set(built);
498 self.structural_index
499 .get()
500 .expect("structural index cache initialized")
501 .as_ref()
502 .map(Some)
503 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
504 }
505
506 pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
509 if let Some(root) = self.root_val.get() {
510 return Ok(root.clone());
511 }
512 let root = {
513 #[cfg(feature = "simd-json")]
514 {
515 if let Some(tape) = self.lazy_tape()? {
516 Val::from_tape_data(tape)
517 } else {
518 Val::from(&self.document)
519 }
520 }
521 #[cfg(not(feature = "simd-json"))]
522 {
523 Val::from(&self.document)
524 }
525 };
526 let _ = self.root_val.set(root);
527 Ok(self.root_val.get().expect("root val initialized").clone())
528 }
529
530 #[cfg(test)]
533 pub(crate) fn root_val_is_materialized(&self) -> bool {
534 self.root_val.get().is_some()
535 }
536
537 #[cfg(test)]
538 pub(crate) fn structural_index_is_built(&self) -> bool {
539 self.structural_index.get().is_some()
540 }
541
542 #[cfg(all(test, feature = "simd-json"))]
543 pub(crate) fn tape_is_built(&self) -> bool {
544 self.tape.get().is_some()
545 }
546
547 #[cfg(all(test, feature = "simd-json"))]
548 pub(crate) fn reset_tape_materialized_subtrees(&self) {
549 if let Ok(Some(tape)) = self.lazy_tape() {
550 tape.reset_materialized_subtrees();
551 }
552 }
553
554 #[cfg(all(test, feature = "simd-json"))]
555 pub(crate) fn tape_materialized_subtrees(&self) -> usize {
556 self.lazy_tape()
557 .ok()
558 .flatten()
559 .map(|tape| tape.materialized_subtrees())
560 .unwrap_or(0)
561 }
562
563 pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
567 exec::router::collect_json(self, expr.as_ref())
568 }
569}
570
571impl From<Value> for Jetro {
575 fn from(v: Value) -> Self {
577 Self::new(v)
578 }
579}