pub(crate) mod builtins;
pub(crate) mod compile;
pub(crate) mod data;
pub(crate) mod exec;
pub(crate) mod ir;
pub(crate) mod parse;
pub(crate) mod plan;
pub(crate) mod util;
pub(crate) mod vm;
#[cfg(test)]
mod tests;
use serde_json::Value;
use std::cell::{OnceCell, RefCell};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use data::value::Val;
pub use data::context::EvalError;
#[cfg(test)]
use parse::parser::ParseError;
use vm::VM;
#[cfg(feature = "fuzz_internal")]
pub mod __fuzz_internal {
pub use crate::parse::parser::{parse, ParseError};
pub use crate::plan::physical::plan_query;
}
#[cfg(test)]
#[derive(Debug)]
pub(crate) enum Error {
Parse(ParseError),
Eval(EvalError),
}
#[cfg(test)]
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Parse(e) => write!(f, "{}", e),
Error::Eval(e) => write!(f, "{}", e),
}
}
}
#[cfg(test)]
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Parse(e) => Some(e),
Error::Eval(_) => None,
}
}
}
#[cfg(test)]
impl From<ParseError> for Error {
fn from(e: ParseError) -> Self {
Error::Parse(e)
}
}
#[cfg(test)]
impl From<EvalError> for Error {
fn from(e: EvalError) -> Self {
Error::Eval(e)
}
}
thread_local! {
static THREAD_VM: OnceCell<RefCell<VM>> = const { OnceCell::new() };
}
fn with_vm<F, R>(f: F) -> R
where
F: FnOnce(&RefCell<VM>) -> R,
{
THREAD_VM.with(|cell| {
let inner = cell.get_or_init(|| RefCell::new(VM::new()));
f(inner)
})
}
pub struct Jetro {
document: Value,
root_val: OnceCell<Val>,
raw_bytes: Option<Arc<[u8]>>,
#[cfg(feature = "simd-json")]
tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
#[cfg(not(feature = "simd-json"))]
#[allow(dead_code)]
tape: OnceCell<()>,
structural_index:
OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
pub(crate) objvec_cache:
std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
}
pub struct JetroEngine {
plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
plan_cache_limit: usize,
vm: Mutex<VM>,
keys: Arc<crate::data::intern::KeyCache>,
}
#[derive(Debug)]
pub enum JetroEngineError {
Json(serde_json::Error),
Eval(EvalError),
}
impl std::fmt::Display for JetroEngineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Json(err) => write!(f, "{}", err),
Self::Eval(err) => write!(f, "{}", err),
}
}
}
impl std::error::Error for JetroEngineError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Json(err) => Some(err),
Self::Eval(_) => None,
}
}
}
impl From<serde_json::Error> for JetroEngineError {
fn from(err: serde_json::Error) -> Self {
Self::Json(err)
}
}
impl From<EvalError> for JetroEngineError {
fn from(err: EvalError) -> Self {
Self::Eval(err)
}
}
impl Default for JetroEngine {
fn default() -> Self {
Self::new()
}
}
impl JetroEngine {
const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
pub fn new() -> Self {
Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
}
pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
Self {
plan_cache: Mutex::new(HashMap::new()),
plan_cache_limit,
vm: Mutex::new(VM::new()),
keys: crate::data::intern::KeyCache::new(),
}
}
pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
&self.keys
}
pub fn clear_cache(&self) {
self.plan_cache.lock().expect("plan cache poisoned").clear();
self.keys.clear();
}
pub fn parse_value(&self, document: Value) -> Jetro {
let root = Val::from_value_with(&self.keys, &document);
Jetro::from_val_and_value(root, document)
}
pub fn parse_bytes(
&self,
bytes: Vec<u8>,
) -> std::result::Result<Jetro, JetroEngineError> {
let document = Jetro::from_bytes(bytes)?;
let _ = document.root_val_with(&self.keys)?;
Ok(document)
}
pub fn collect<S: AsRef<str>>(
&self,
document: &Jetro,
expr: S,
) -> std::result::Result<Value, EvalError> {
let plan = self.cached_plan(expr.as_ref(), exec::router::planning_context(document));
let mut vm = self.vm.lock().expect("vm cache poisoned");
exec::router::collect_plan_json_with_vm(document, &plan, &mut vm)
}
pub fn collect_value<S: AsRef<str>>(
&self,
document: Value,
expr: S,
) -> std::result::Result<Value, EvalError> {
let document = self.parse_value(document);
self.collect(&document, expr)
}
pub fn collect_bytes<S: AsRef<str>>(
&self,
bytes: Vec<u8>,
expr: S,
) -> std::result::Result<Value, JetroEngineError> {
let document = self.parse_bytes(bytes)?;
Ok(self.collect(&document, expr)?)
}
fn cached_plan(&self, expr: &str, context: plan::physical::PlanningContext) -> ir::physical::QueryPlan {
let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
let cache_key = format!("{}\0{}", context.cache_key(), expr);
if let Some(plan) = cache.get(&cache_key) {
return plan.clone();
}
let plan = plan::physical::plan_query_with_context(expr, context);
if self.plan_cache_limit > 0 {
if cache.len() >= self.plan_cache_limit {
cache.clear();
}
cache.insert(cache_key, plan.clone());
}
plan
}
}
impl exec::pipeline::PipelineData for Jetro {
fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
self.get_or_promote_objvec(arr)
}
}
impl Jetro {
#[cfg(feature = "simd-json")]
pub(crate) fn lazy_tape(
&self,
) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
if let Some(result) = self.tape.get() {
return result
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")));
}
let Some(raw) = self.raw_bytes.as_ref() else {
return Ok(None);
};
let bytes: Vec<u8> = (**raw).to_vec();
let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
let _ = self.tape.set(parsed);
self.tape
.get()
.expect("tape cache initialized")
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")))
}
pub(crate) fn get_or_promote_objvec(
&self,
arr: &Arc<Vec<Val>>,
) -> Option<Arc<crate::data::value::ObjVecData>> {
let key = Arc::as_ptr(arr) as usize;
if let Ok(cache) = self.objvec_cache.lock() {
if let Some(d) = cache.get(&key) {
return Some(Arc::clone(d));
}
}
let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
if let Ok(mut cache) = self.objvec_cache.lock() {
cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
}
Some(promoted)
}
pub(crate) fn new(document: Value) -> Self {
Self {
document,
root_val: OnceCell::new(),
objvec_cache: Default::default(),
raw_bytes: None,
tape: OnceCell::new(),
structural_index: OnceCell::new(),
}
}
pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
let root_val = OnceCell::new();
let _ = root_val.set(root);
Self {
document,
root_val,
objvec_cache: Default::default(),
raw_bytes: None,
tape: OnceCell::new(),
structural_index: OnceCell::new(),
}
}
pub(crate) fn root_val_with(
&self,
keys: &crate::data::intern::KeyCache,
) -> std::result::Result<Val, EvalError> {
if let Some(root) = self.root_val.get() {
return Ok(root.clone());
}
let root = {
#[cfg(feature = "simd-json")]
{
if let Some(tape) = self.lazy_tape()? {
Val::from_tape_data_with(keys, tape)
} else {
Val::from_value_with(keys, &self.document)
}
}
#[cfg(not(feature = "simd-json"))]
{
Val::from_value_with(keys, &self.document)
}
};
let _ = self.root_val.set(root);
Ok(self.root_val.get().expect("root val initialized").clone())
}
pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
#[cfg(feature = "simd-json")]
{
return Ok(Self {
document: Value::Null,
root_val: OnceCell::new(),
objvec_cache: Default::default(),
raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
tape: OnceCell::new(),
structural_index: OnceCell::new(),
});
}
#[allow(unreachable_code)]
{
let document: Value = serde_json::from_slice(&bytes)?;
Ok(Self {
document,
root_val: OnceCell::new(),
objvec_cache: Default::default(),
raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
tape: OnceCell::new(),
structural_index: OnceCell::new(),
})
}
}
pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
self.raw_bytes.as_deref()
}
pub(crate) fn lazy_structural_index(
&self,
) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
if let Some(result) = self.structural_index.get() {
return result
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")));
}
let Some(raw) = self.raw_bytes.as_ref() else {
return Ok(None);
};
let built = jetro_experimental::from_bytes_with(
raw.as_ref(),
jetro_experimental::BuildOptions::keys_only(),
)
.map(Arc::new)
.map_err(|err| err.to_string());
let _ = self.structural_index.set(built);
self.structural_index
.get()
.expect("structural index cache initialized")
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")))
}
pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
if let Some(root) = self.root_val.get() {
return Ok(root.clone());
}
let root = {
#[cfg(feature = "simd-json")]
{
if let Some(tape) = self.lazy_tape()? {
Val::from_tape_data(tape)
} else {
Val::from(&self.document)
}
}
#[cfg(not(feature = "simd-json"))]
{
Val::from(&self.document)
}
};
let _ = self.root_val.set(root);
Ok(self.root_val.get().expect("root val initialized").clone())
}
#[cfg(test)]
pub(crate) fn root_val_is_materialized(&self) -> bool {
self.root_val.get().is_some()
}
#[cfg(test)]
pub(crate) fn structural_index_is_built(&self) -> bool {
self.structural_index.get().is_some()
}
#[cfg(all(test, feature = "simd-json"))]
pub(crate) fn tape_is_built(&self) -> bool {
self.tape.get().is_some()
}
#[cfg(all(test, feature = "simd-json"))]
pub(crate) fn reset_tape_materialized_subtrees(&self) {
if let Ok(Some(tape)) = self.lazy_tape() {
tape.reset_materialized_subtrees();
}
}
#[cfg(all(test, feature = "simd-json"))]
pub(crate) fn tape_materialized_subtrees(&self) -> usize {
self.lazy_tape()
.ok()
.flatten()
.map(|tape| tape.materialized_subtrees())
.unwrap_or(0)
}
pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
exec::router::collect_json(self, expr.as_ref())
}
}
impl From<Value> for Jetro {
fn from(v: Value) -> Self {
Self::new(v)
}
}