1#[cfg_attr(not(test), allow(dead_code))]
25pub(crate) mod analysis;
26pub(crate) mod ast;
27pub(crate) mod builtin_helpers;
28pub(crate) mod builtin_registry;
29pub(crate) mod builtins;
30pub(crate) mod chain_ir;
31pub(crate) mod composed;
32pub(crate) mod context;
33pub(crate) mod executor;
34pub(crate) mod parser;
35pub(crate) mod physical;
36pub(crate) mod physical_eval;
37pub(crate) mod pipeline;
38pub(crate) mod planner;
39pub(crate) mod runtime;
40pub(crate) mod strref;
41pub(crate) mod structural;
42pub(crate) mod util;
43pub(crate) mod value;
44#[cfg_attr(not(test), allow(dead_code))]
45pub(crate) mod value_view;
46pub(crate) mod view_pipeline;
47pub(crate) mod compiler;
48pub(crate) mod compiler_passes;
49pub(crate) mod vm;
50pub(crate) mod logical_plan;
51pub(crate) mod logical_planner;
52pub(crate) mod optimizer;
53
54#[cfg(test)]
55mod examples;
56#[cfg(test)]
57mod tests;
58
59use serde_json::Value;
60use std::cell::{OnceCell, RefCell};
61use std::collections::HashMap;
62use std::sync::Arc;
63use std::sync::Mutex;
64use value::Val;
65
66pub use context::EvalError;
67#[cfg(test)]
68use parser::ParseError;
69use vm::VM;
70
71
72#[cfg(test)]
73#[derive(Debug)]
74pub(crate) enum Error {
75 Parse(ParseError),
76 Eval(EvalError),
77}
78
79#[cfg(test)]
80impl std::fmt::Display for Error {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Error::Parse(e) => write!(f, "{}", e),
84 Error::Eval(e) => write!(f, "{}", e),
85 }
86 }
87}
88#[cfg(test)]
89impl std::error::Error for Error {
90 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
91 match self {
92 Error::Parse(e) => Some(e),
93 Error::Eval(_) => None,
94 }
95 }
96}
97
98#[cfg(test)]
99impl From<ParseError> for Error {
100 fn from(e: ParseError) -> Self {
101 Error::Parse(e)
102 }
103}
104#[cfg(test)]
105impl From<EvalError> for Error {
106 fn from(e: EvalError) -> Self {
107 Error::Eval(e)
108 }
109}
110
111
112thread_local! {
115 static THREAD_VM: OnceCell<RefCell<VM>> = const { OnceCell::new() };
116}
117
118fn with_vm<F, R>(f: F) -> R
122where
123 F: FnOnce(&RefCell<VM>) -> R,
124{
125 THREAD_VM.with(|cell| {
126 let inner = cell.get_or_init(|| RefCell::new(VM::new()));
127 f(inner)
128 })
129}
130
131
132pub struct Jetro {
137 document: Value,
140 root_val: OnceCell<Val>,
142 raw_bytes: Option<Arc<[u8]>>,
144
145 #[cfg(feature = "simd-json")]
147 tape: OnceCell<std::result::Result<Arc<crate::strref::TapeData>, String>>,
148 #[cfg(not(feature = "simd-json"))]
150 #[allow(dead_code)]
151 tape: OnceCell<()>,
152
153 structural_index:
155 OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
156
157 pub(crate) objvec_cache:
160 std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::value::ObjVecData>>>,
161}
162
163
164pub struct JetroEngine {
169 plan_cache: Mutex<HashMap<String, physical::QueryPlan>>,
171 plan_cache_limit: usize,
173 vm: Mutex<VM>,
175}
176
177#[derive(Debug)]
180pub enum JetroEngineError {
181 Json(serde_json::Error),
183 Eval(EvalError),
185}
186
187impl std::fmt::Display for JetroEngineError {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 match self {
190 Self::Json(err) => write!(f, "{}", err),
191 Self::Eval(err) => write!(f, "{}", err),
192 }
193 }
194}
195
196impl std::error::Error for JetroEngineError {
197 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
198 match self {
199 Self::Json(err) => Some(err),
200 Self::Eval(_) => None,
201 }
202 }
203}
204
205impl From<serde_json::Error> for JetroEngineError {
206 fn from(err: serde_json::Error) -> Self {
207 Self::Json(err)
208 }
209}
210
211impl From<EvalError> for JetroEngineError {
212 fn from(err: EvalError) -> Self {
213 Self::Eval(err)
214 }
215}
216
217impl Default for JetroEngine {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223impl JetroEngine {
224 const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
226
227 pub fn new() -> Self {
229 Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
230 }
231
232 pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
235 Self {
236 plan_cache: Mutex::new(HashMap::new()),
237 plan_cache_limit,
238 vm: Mutex::new(VM::new()),
239 }
240 }
241
242 pub fn clear_cache(&self) {
244 self.plan_cache.lock().expect("plan cache poisoned").clear();
245 }
246
247 pub fn collect<S: AsRef<str>>(
250 &self,
251 document: &Jetro,
252 expr: S,
253 ) -> std::result::Result<Value, EvalError> {
254 let plan = self.cached_plan(expr.as_ref(), executor::planning_context(document));
255 let mut vm = self.vm.lock().expect("vm cache poisoned");
256 executor::collect_plan_json_with_vm(document, &plan, &mut vm)
257 }
258
259 pub fn collect_value<S: AsRef<str>>(
261 &self,
262 document: Value,
263 expr: S,
264 ) -> std::result::Result<Value, EvalError> {
265 let document = Jetro::from(document);
266 self.collect(&document, expr)
267 }
268
269 pub fn collect_bytes<S: AsRef<str>>(
272 &self,
273 bytes: Vec<u8>,
274 expr: S,
275 ) -> std::result::Result<Value, JetroEngineError> {
276 let document = Jetro::from_bytes(bytes)?;
277 Ok(self.collect(&document, expr)?)
278 }
279
280 fn cached_plan(&self, expr: &str, context: planner::PlanningContext) -> physical::QueryPlan {
283 let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
284 let cache_key = format!("{}\0{}", context.cache_key(), expr);
285 if let Some(plan) = cache.get(&cache_key) {
286 return plan.clone();
287 }
288
289 let plan = planner::plan_query_with_context(expr, context);
290 if self.plan_cache_limit > 0 {
291 if cache.len() >= self.plan_cache_limit {
292 cache.clear();
293 }
294 cache.insert(cache_key, plan.clone());
295 }
296 plan
297 }
298}
299
300impl pipeline::PipelineData for Jetro {
301 fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::value::ObjVecData>> {
302 self.get_or_promote_objvec(arr)
303 }
304}
305
306impl Jetro {
307 #[cfg(feature = "simd-json")]
310 pub(crate) fn lazy_tape(
311 &self,
312 ) -> std::result::Result<Option<&Arc<crate::strref::TapeData>>, EvalError> {
313 if let Some(result) = self.tape.get() {
314 return result
315 .as_ref()
316 .map(Some)
317 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
318 }
319 let Some(raw) = self.raw_bytes.as_ref() else {
320 return Ok(None);
321 };
322 let bytes: Vec<u8> = (**raw).to_vec();
323 let parsed = crate::strref::TapeData::parse(bytes).map_err(|err| err.to_string());
324 let _ = self.tape.set(parsed);
325 self.tape
326 .get()
327 .expect("tape cache initialized")
328 .as_ref()
329 .map(Some)
330 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
331 }
332
333 pub(crate) fn get_or_promote_objvec(
336 &self,
337 arr: &Arc<Vec<Val>>,
338 ) -> Option<Arc<crate::value::ObjVecData>> {
339 let key = Arc::as_ptr(arr) as usize;
340 if let Ok(cache) = self.objvec_cache.lock() {
341 if let Some(d) = cache.get(&key) {
342 return Some(Arc::clone(d));
343 }
344 }
345 let promoted = pipeline::Pipeline::try_promote_objvec_arr(arr)?;
346 if let Ok(mut cache) = self.objvec_cache.lock() {
347 cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
348 }
349 Some(promoted)
350 }
351
352 pub(crate) fn new(document: Value) -> Self {
354 Self {
355 document,
356 root_val: OnceCell::new(),
357 objvec_cache: Default::default(),
358 raw_bytes: None,
359 tape: OnceCell::new(),
360 structural_index: OnceCell::new(),
361 }
362 }
363
364 pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
368
369
370 #[cfg(feature = "simd-json")]
371 {
372 return Ok(Self {
373 document: Value::Null,
374 root_val: OnceCell::new(),
375 objvec_cache: Default::default(),
376 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
377 tape: OnceCell::new(),
378 structural_index: OnceCell::new(),
379 });
380 }
381 #[allow(unreachable_code)]
382 {
383 let document: Value = serde_json::from_slice(&bytes)?;
384 Ok(Self {
385 document,
386 root_val: OnceCell::new(),
387 objvec_cache: Default::default(),
388 raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
389 tape: OnceCell::new(),
390 structural_index: OnceCell::new(),
391 })
392 }
393 }
394
395 pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
398 self.raw_bytes.as_deref()
399 }
400
401 pub(crate) fn lazy_structural_index(
404 &self,
405 ) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
406 if let Some(result) = self.structural_index.get() {
407 return result
408 .as_ref()
409 .map(Some)
410 .map_err(|err| EvalError(format!("Invalid JSON: {err}")));
411 }
412 let Some(raw) = self.raw_bytes.as_ref() else {
413 return Ok(None);
414 };
415 let built = jetro_experimental::from_bytes_with(
416 raw.as_ref(),
417 jetro_experimental::BuildOptions::keys_only(),
418 )
419 .map(Arc::new)
420 .map_err(|err| err.to_string());
421 let _ = self.structural_index.set(built);
422 self.structural_index
423 .get()
424 .expect("structural index cache initialized")
425 .as_ref()
426 .map(Some)
427 .map_err(|err| EvalError(format!("Invalid JSON: {err}")))
428 }
429
430 pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
433 if let Some(root) = self.root_val.get() {
434 return Ok(root.clone());
435 }
436 let root = {
437 #[cfg(feature = "simd-json")]
438 {
439 if let Some(tape) = self.lazy_tape()? {
440 Val::from_tape_data(tape)
441 } else {
442 Val::from(&self.document)
443 }
444 }
445 #[cfg(not(feature = "simd-json"))]
446 {
447 Val::from(&self.document)
448 }
449 };
450 let _ = self.root_val.set(root);
451 Ok(self.root_val.get().expect("root val initialized").clone())
452 }
453
454 #[cfg(test)]
457 pub(crate) fn root_val_is_materialized(&self) -> bool {
458 self.root_val.get().is_some()
459 }
460
461 #[cfg(test)]
462 pub(crate) fn structural_index_is_built(&self) -> bool {
463 self.structural_index.get().is_some()
464 }
465
466 #[cfg(all(test, feature = "simd-json"))]
467 pub(crate) fn tape_is_built(&self) -> bool {
468 self.tape.get().is_some()
469 }
470
471 #[cfg(all(test, feature = "simd-json"))]
472 pub(crate) fn reset_tape_materialized_subtrees(&self) {
473 if let Ok(Some(tape)) = self.lazy_tape() {
474 tape.reset_materialized_subtrees();
475 }
476 }
477
478 #[cfg(all(test, feature = "simd-json"))]
479 pub(crate) fn tape_materialized_subtrees(&self) -> usize {
480 self.lazy_tape()
481 .ok()
482 .flatten()
483 .map(|tape| tape.materialized_subtrees())
484 .unwrap_or(0)
485 }
486
487 pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
491 executor::collect_json(self, expr.as_ref())
492 }
493}
494
495impl From<Value> for Jetro {
499 fn from(v: Value) -> Self {
501 Self::new(v)
502 }
503}