pub(crate) mod builtins;
pub(crate) mod compile;
pub(crate) mod data;
pub(crate) mod exec;
pub mod introspect;
pub mod io;
pub(crate) mod ir;
pub(crate) mod parse;
pub(crate) mod plan;
pub(crate) mod util;
pub(crate) mod vm;
#[cfg(test)]
mod tests;
use data::value::Val;
use serde_json::Value;
use std::cell::{OnceCell, RefCell};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
pub use data::context::EvalError;
#[cfg(test)]
use parse::parser::ParseError;
use vm::VM;
#[cfg(feature = "fuzz_internal")]
pub mod __fuzz_internal {
pub use crate::parse::parser::{parse, ParseError};
pub use crate::plan::physical::plan_query;
}
#[cfg(test)]
#[derive(Debug)]
pub(crate) enum Error {
Parse(ParseError),
Eval(EvalError),
}
#[cfg(test)]
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Parse(e) => write!(f, "{}", e),
Error::Eval(e) => write!(f, "{}", e),
}
}
}
#[cfg(test)]
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Parse(e) => Some(e),
Error::Eval(_) => None,
}
}
}
#[cfg(test)]
impl From<ParseError> for Error {
fn from(e: ParseError) -> Self {
Error::Parse(e)
}
}
#[cfg(test)]
impl From<EvalError> for Error {
fn from(e: EvalError) -> Self {
Error::Eval(e)
}
}
pub struct Jetro {
document: Value,
root_val: OnceCell<Val>,
raw_bytes: Option<Arc<[u8]>>,
tape: OnceCell<std::result::Result<Arc<crate::data::tape::TapeData>, String>>,
structural_index:
OnceCell<std::result::Result<Arc<jetro_experimental::StructuralIndex>, String>>,
pub(crate) objvec_cache:
std::sync::Mutex<std::collections::HashMap<usize, Arc<crate::data::value::ObjVecData>>>,
vm: RefCell<VM>,
}
pub struct JetroEngine {
plan_cache: Mutex<HashMap<String, ir::physical::QueryPlan>>,
plan_cache_limit: usize,
vm: Mutex<VM>,
keys: Arc<crate::data::intern::KeyCache>,
}
#[derive(Debug)]
pub enum JetroEngineError {
Json(serde_json::Error),
Io(std::io::Error),
Ndjson(io::RowError),
Eval(EvalError),
}
impl std::fmt::Display for JetroEngineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Json(err) => write!(f, "{}", err),
Self::Io(err) => write!(f, "{}", err),
Self::Ndjson(err) => write!(f, "{}", err),
Self::Eval(err) => write!(f, "{}", err),
}
}
}
impl std::error::Error for JetroEngineError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Json(err) => Some(err),
Self::Io(err) => Some(err),
Self::Ndjson(err) => Some(err),
Self::Eval(_) => None,
}
}
}
impl From<serde_json::Error> for JetroEngineError {
fn from(err: serde_json::Error) -> Self {
Self::Json(err)
}
}
impl From<std::io::Error> for JetroEngineError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
impl From<io::RowError> for JetroEngineError {
fn from(err: io::RowError) -> Self {
Self::Ndjson(err)
}
}
impl From<EvalError> for JetroEngineError {
fn from(err: EvalError) -> Self {
Self::Eval(err)
}
}
impl Default for JetroEngine {
fn default() -> Self {
Self::new()
}
}
impl JetroEngine {
const DEFAULT_PLAN_CACHE_LIMIT: usize = 256;
pub fn inspect_query(
&self,
query: &str,
options: introspect::InspectOptions,
) -> std::result::Result<introspect::QueryInspection, JetroEngineError> {
introspect::inspect_query(self, query, options, io::NdjsonOptions::default())
}
pub fn inspect_ndjson_query_with_options(
&self,
query: &str,
source: io::NdjsonSourceMode,
ndjson_options: io::NdjsonOptions,
level: introspect::InspectLevel,
) -> std::result::Result<introspect::QueryInspection, JetroEngineError> {
let context = match source {
io::NdjsonSourceMode::Reader => introspect::InspectContext::NdjsonReader,
io::NdjsonSourceMode::File => introspect::InspectContext::NdjsonFile,
};
introspect::inspect_query(
self,
query,
introspect::InspectOptions { level, context },
ndjson_options,
)
}
pub fn new() -> Self {
Self::with_plan_cache_limit(Self::DEFAULT_PLAN_CACHE_LIMIT)
}
pub fn with_plan_cache_limit(plan_cache_limit: usize) -> Self {
Self {
plan_cache: Mutex::new(HashMap::new()),
plan_cache_limit,
vm: Mutex::new(VM::new()),
keys: crate::data::intern::KeyCache::new(),
}
}
pub fn keys(&self) -> &Arc<crate::data::intern::KeyCache> {
&self.keys
}
pub fn clear_cache(&self) {
self.plan_cache.lock().expect("plan cache poisoned").clear();
self.keys.clear();
}
pub fn parse_value(&self, document: Value) -> Jetro {
let root = Val::from_value_with(&self.keys, &document);
Jetro::from_val_and_value(root, document)
}
pub fn parse_bytes(&self, bytes: Vec<u8>) -> std::result::Result<Jetro, JetroEngineError> {
let document = Jetro::from_bytes(bytes)?;
let _ = document.root_val_with(&self.keys)?;
Ok(document)
}
pub(crate) fn parse_bytes_lazy(
&self,
bytes: Vec<u8>,
) -> std::result::Result<Jetro, JetroEngineError> {
Ok(Jetro::from_bytes(bytes)?)
}
pub fn collect<S: AsRef<str>>(
&self,
document: &Jetro,
expr: S,
) -> std::result::Result<Value, EvalError> {
let expr = expr.as_ref();
if let Some(rows) = io::collect_document_rows(self, document, expr)? {
return Ok(Value::from(rows));
}
let plan = self.cached_plan(expr, exec::router::planning_context(document));
self.collect_prepared(document, &plan)
}
pub(crate) fn collect_prepared(
&self,
document: &Jetro,
plan: &ir::physical::QueryPlan,
) -> std::result::Result<Value, EvalError> {
self.collect_prepared_val(document, plan).map(Value::from)
}
pub(crate) fn collect_prepared_val(
&self,
document: &Jetro,
plan: &ir::physical::QueryPlan,
) -> std::result::Result<Val, EvalError> {
let mut vm = self.vm.lock().expect("vm cache poisoned");
exec::router::collect_plan_val_with_vm(document, plan, &mut vm)
}
pub(crate) fn lock_vm(&self) -> std::sync::MutexGuard<'_, VM> {
self.vm.lock().expect("vm cache poisoned")
}
pub fn collect_value<S: AsRef<str>>(
&self,
document: Value,
expr: S,
) -> std::result::Result<Value, EvalError> {
let document = self.parse_value(document);
self.collect(&document, expr)
}
pub fn collect_bytes<S: AsRef<str>>(
&self,
bytes: Vec<u8>,
expr: S,
) -> std::result::Result<Value, JetroEngineError> {
let document = self.parse_bytes(bytes)?;
Ok(self.collect(&document, expr)?)
}
pub fn run_ndjson<R, W>(
&self,
reader: R,
query: &str,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson(self, reader, query, writer)
}
pub fn run_ndjson_file<P, W>(
&self,
path: P,
query: &str,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file(self, path, query, writer)
}
pub fn run_ndjson_file_with_options<P, W>(
&self,
path: P,
query: &str,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file_with_options(self, path, query, writer, options)
}
pub fn run_ndjson_file_with_report<P, W>(
&self,
path: P,
query: &str,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file_with_report(self, path, query, writer)
}
pub fn run_ndjson_file_with_report_and_options<P, W>(
&self,
path: P,
query: &str,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file_with_report_and_options(self, path, query, writer, options)
}
pub fn run_ndjson_file_limit<P, W>(
&self,
path: P,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file_limit(self, path, query, limit, writer)
}
pub fn run_ndjson_file_limit_with_options<P, W>(
&self,
path: P,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file_limit_with_options(self, path, query, limit, writer, options)
}
pub fn run_ndjson_file_limit_with_report<P, W>(
&self,
path: P,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file_limit_with_report(self, path, query, limit, writer)
}
pub fn run_ndjson_file_limit_with_report_and_options<P, W>(
&self,
path: P,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_file_limit_with_report_and_options(self, path, query, limit, writer, options)
}
pub fn run_ndjson_source<W>(
&self,
source: io::NdjsonSource,
query: &str,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source(self, source, query, writer)
}
pub fn run_ndjson_source_with_options<W>(
&self,
source: io::NdjsonSource,
query: &str,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source_with_options(self, source, query, writer, options)
}
pub fn run_ndjson_source_with_report<W>(
&self,
source: io::NdjsonSource,
query: &str,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source_with_report(self, source, query, writer)
}
pub fn run_ndjson_source_with_report_and_options<W>(
&self,
source: io::NdjsonSource,
query: &str,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source_with_report_and_options(self, source, query, writer, options)
}
pub fn run_ndjson_source_limit<W>(
&self,
source: io::NdjsonSource,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source_limit(self, source, query, limit, writer)
}
pub fn run_ndjson_source_limit_with_options<W>(
&self,
source: io::NdjsonSource,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source_limit_with_options(self, source, query, limit, writer, options)
}
pub fn run_ndjson_source_limit_with_report<W>(
&self,
source: io::NdjsonSource,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source_limit_with_report(self, source, query, limit, writer)
}
pub fn run_ndjson_source_limit_with_report_and_options<W>(
&self,
source: io::NdjsonSource,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_source_limit_with_report_and_options(
self, source, query, limit, writer, options,
)
}
pub fn run_ndjson_rev<P, W>(
&self,
path: P,
query: &str,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev(self, path, query, writer)
}
pub fn run_ndjson_rev_with_options<P, W>(
&self,
path: P,
query: &str,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_with_options(self, path, query, writer, options)
}
pub fn run_ndjson_rev_limit<P, W>(
&self,
path: P,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_limit(self, path, query, limit, writer)
}
pub fn run_ndjson_rev_limit_with_options<P, W>(
&self,
path: P,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_limit_with_options(self, path, query, limit, writer, options)
}
pub fn run_ndjson_rev_distinct_by<P, W>(
&self,
path: P,
key_query: &str,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_distinct_by(self, path, key_query, query, limit, writer)
}
pub fn run_ndjson_rev_distinct_by_with_options<P, W>(
&self,
path: P,
key_query: &str,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_distinct_by_with_options(
self, path, key_query, query, limit, writer, options,
)
}
pub fn run_ndjson_rev_distinct_by_with_stats<P, W>(
&self,
path: P,
key_query: &str,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_distinct_by_with_stats(self, path, key_query, query, limit, writer)
}
pub fn run_ndjson_rev_distinct_by_with_stats_and_options<P, W>(
&self,
path: P,
key_query: &str,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonRevDistinctStats, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_distinct_by_with_stats_and_options(
self, path, key_query, query, limit, writer, options,
)
}
pub fn run_ndjson_rev_distinct_by_with_report<P, W>(
&self,
path: P,
key_query: &str,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_distinct_by_with_report(self, path, key_query, query, limit, writer)
}
pub fn run_ndjson_rev_distinct_by_with_report_and_options<P, W>(
&self,
path: P,
key_query: &str,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_distinct_by_with_report_and_options(
self, path, key_query, query, limit, writer, options,
)
}
pub fn run_ndjson_with_options<R, W>(
&self,
reader: R,
query: &str,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_with_options(self, reader, query, writer, options)
}
pub fn run_ndjson_with_report<R, W>(
&self,
reader: R,
query: &str,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_with_report(self, reader, query, writer)
}
pub fn run_ndjson_with_report_and_options<R, W>(
&self,
reader: R,
query: &str,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_with_report_and_options(self, reader, query, writer, options)
}
pub fn run_ndjson_limit<R, W>(
&self,
reader: R,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_limit(self, reader, query, limit, writer)
}
pub fn run_ndjson_limit_with_options<R, W>(
&self,
reader: R,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_limit_with_options(self, reader, query, limit, writer, options)
}
pub fn run_ndjson_limit_with_report<R, W>(
&self,
reader: R,
query: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_limit_with_report(self, reader, query, limit, writer)
}
pub fn run_ndjson_limit_with_report_and_options<R, W>(
&self,
reader: R,
query: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_limit_with_report_and_options(self, reader, query, limit, writer, options)
}
pub fn run_ndjson_matches<R, W>(
&self,
reader: R,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_matches(self, reader, predicate, limit, writer)
}
pub fn run_ndjson_matches_with_options<R, W>(
&self,
reader: R,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_matches_with_options(self, reader, predicate, limit, writer, options)
}
pub fn run_ndjson_matches_with_report<R, W>(
&self,
reader: R,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_matches_with_report(self, reader, predicate, limit, writer)
}
pub fn run_ndjson_matches_with_report_and_options<R, W>(
&self,
reader: R,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
R: std::io::BufRead,
W: std::io::Write,
{
io::run_ndjson_matches_with_report_and_options(
self, reader, predicate, limit, writer, options,
)
}
pub fn run_ndjson_matches_file<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_matches_file(self, path, predicate, limit, writer)
}
pub fn run_ndjson_matches_file_with_options<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_matches_file_with_options(self, path, predicate, limit, writer, options)
}
pub fn run_ndjson_matches_file_with_report<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_matches_file_with_report(self, path, predicate, limit, writer)
}
pub fn run_ndjson_matches_file_with_report_and_options<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_matches_file_with_report_and_options(
self, path, predicate, limit, writer, options,
)
}
pub fn run_ndjson_matches_source<W>(
&self,
source: io::NdjsonSource,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_matches_source(self, source, predicate, limit, writer)
}
pub fn run_ndjson_matches_source_with_options<W>(
&self,
source: io::NdjsonSource,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_matches_source_with_options(self, source, predicate, limit, writer, options)
}
pub fn run_ndjson_matches_source_with_report<W>(
&self,
source: io::NdjsonSource,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_matches_source_with_report(self, source, predicate, limit, writer)
}
pub fn run_ndjson_matches_source_with_report_and_options<W>(
&self,
source: io::NdjsonSource,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
W: std::io::Write,
{
io::run_ndjson_matches_source_with_report_and_options(
self, source, predicate, limit, writer, options,
)
}
pub fn run_ndjson_rev_matches<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_matches(self, path, predicate, limit, writer)
}
pub fn run_ndjson_rev_matches_with_options<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_matches_with_options(self, path, predicate, limit, writer, options)
}
pub fn run_ndjson_rev_matches_with_report<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_matches_with_report(self, path, predicate, limit, writer)
}
pub fn run_ndjson_rev_matches_with_report_and_options<P, W>(
&self,
path: P,
predicate: &str,
limit: usize,
writer: W,
options: io::NdjsonOptions,
) -> std::result::Result<io::NdjsonExecutionReport, JetroEngineError>
where
P: AsRef<std::path::Path>,
W: std::io::Write,
{
io::run_ndjson_rev_matches_with_report_and_options(
self, path, predicate, limit, writer, options,
)
}
pub fn collect_ndjson<R>(
&self,
reader: R,
query: &str,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
R: std::io::BufRead,
{
io::collect_ndjson(self, reader, query)
}
pub fn collect_ndjson_file<P>(
&self,
path: P,
query: &str,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_file(self, path, query)
}
pub fn collect_ndjson_file_with_options<P>(
&self,
path: P,
query: &str,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_file_with_options(self, path, query, options)
}
pub fn collect_ndjson_source(
&self,
source: io::NdjsonSource,
query: &str,
) -> std::result::Result<Vec<Value>, JetroEngineError> {
io::collect_ndjson_source(self, source, query)
}
pub fn collect_ndjson_source_with_options(
&self,
source: io::NdjsonSource,
query: &str,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError> {
io::collect_ndjson_source_with_options(self, source, query, options)
}
pub fn collect_ndjson_rev<P>(
&self,
path: P,
query: &str,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_rev(self, path, query)
}
pub fn collect_ndjson_rev_with_options<P>(
&self,
path: P,
query: &str,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_rev_with_options(self, path, query, options)
}
pub fn for_each_ndjson_rev<P, F>(
&self,
path: P,
query: &str,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
F: FnMut(Value),
{
io::for_each_ndjson_rev(self, path, query, f)
}
pub fn for_each_ndjson_rev_until<P, F>(
&self,
path: P,
query: &str,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
{
io::for_each_ndjson_rev_with_options(self, path, query, io::NdjsonOptions::default(), f)
}
pub fn for_each_ndjson_rev_until_with_options<P, F>(
&self,
path: P,
query: &str,
options: io::NdjsonOptions,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
{
io::for_each_ndjson_rev_with_options(self, path, query, options, f)
}
pub fn for_each_ndjson_rev_with_options<P, F>(
&self,
path: P,
query: &str,
options: io::NdjsonOptions,
mut f: F,
) -> std::result::Result<usize, JetroEngineError>
where
P: AsRef<std::path::Path>,
F: FnMut(Value),
{
io::for_each_ndjson_rev_with_options(self, path, query, options, |value| {
f(value);
Ok(io::NdjsonControl::Continue)
})
}
pub fn collect_ndjson_with_options<R>(
&self,
reader: R,
query: &str,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
R: std::io::BufRead,
{
io::collect_ndjson_with_options(self, reader, query, options)
}
pub fn collect_ndjson_matches<R>(
&self,
reader: R,
predicate: &str,
limit: usize,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
R: std::io::BufRead,
{
io::collect_ndjson_matches(self, reader, predicate, limit)
}
pub fn collect_ndjson_matches_with_options<R>(
&self,
reader: R,
predicate: &str,
limit: usize,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
R: std::io::BufRead,
{
io::collect_ndjson_matches_with_options(self, reader, predicate, limit, options)
}
pub fn collect_ndjson_matches_file<P>(
&self,
path: P,
predicate: &str,
limit: usize,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_matches_file(self, path, predicate, limit)
}
pub fn collect_ndjson_matches_file_with_options<P>(
&self,
path: P,
predicate: &str,
limit: usize,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_matches_file_with_options(self, path, predicate, limit, options)
}
pub fn collect_ndjson_matches_source(
&self,
source: io::NdjsonSource,
predicate: &str,
limit: usize,
) -> std::result::Result<Vec<Value>, JetroEngineError> {
io::collect_ndjson_matches_source(self, source, predicate, limit)
}
pub fn collect_ndjson_matches_source_with_options(
&self,
source: io::NdjsonSource,
predicate: &str,
limit: usize,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError> {
io::collect_ndjson_matches_source_with_options(self, source, predicate, limit, options)
}
pub fn collect_ndjson_rev_matches<P>(
&self,
path: P,
predicate: &str,
limit: usize,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_rev_matches(self, path, predicate, limit)
}
pub fn collect_ndjson_rev_matches_with_options<P>(
&self,
path: P,
predicate: &str,
limit: usize,
options: io::NdjsonOptions,
) -> std::result::Result<Vec<Value>, JetroEngineError>
where
P: AsRef<std::path::Path>,
{
io::collect_ndjson_rev_matches_with_options(self, path, predicate, limit, options)
}
pub fn for_each_ndjson<R, F>(
&self,
reader: R,
query: &str,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
F: FnMut(Value),
{
io::for_each_ndjson(self, reader, query, f)
}
pub fn for_each_ndjson_until<R, F>(
&self,
reader: R,
query: &str,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
{
io::for_each_ndjson_until(self, reader, query, f)
}
pub fn for_each_ndjson_source<F>(
&self,
source: io::NdjsonSource,
query: &str,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
F: FnMut(Value),
{
io::for_each_ndjson_source(self, source, query, f)
}
pub fn for_each_ndjson_source_until<F>(
&self,
source: io::NdjsonSource,
query: &str,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
{
io::for_each_ndjson_source_until(self, source, query, f)
}
pub fn for_each_ndjson_source_until_with_options<F>(
&self,
source: io::NdjsonSource,
query: &str,
options: io::NdjsonOptions,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
{
io::for_each_ndjson_source_until_with_options(self, source, query, options, f)
}
pub fn for_each_ndjson_source_with_options<F>(
&self,
source: io::NdjsonSource,
query: &str,
options: io::NdjsonOptions,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
F: FnMut(Value),
{
io::for_each_ndjson_source_with_options(self, source, query, options, f)
}
pub fn for_each_ndjson_with_options<R, F>(
&self,
reader: R,
query: &str,
options: io::NdjsonOptions,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
F: FnMut(Value),
{
io::for_each_ndjson_with_options(self, reader, query, options, f)
}
pub fn for_each_ndjson_until_with_options<R, F>(
&self,
reader: R,
query: &str,
options: io::NdjsonOptions,
f: F,
) -> std::result::Result<usize, JetroEngineError>
where
R: std::io::BufRead,
F: FnMut(Value) -> std::result::Result<io::NdjsonControl, JetroEngineError>,
{
io::for_each_ndjson_until_with_options(self, reader, query, options, f)
}
pub(crate) fn cached_plan(
&self,
expr: &str,
context: plan::physical::PlanningContext,
) -> ir::physical::QueryPlan {
let mut cache = self.plan_cache.lock().expect("plan cache poisoned");
let cache_key = format!("{}\0{}", context.cache_key(), expr);
if let Some(plan) = cache.get(&cache_key) {
return plan.clone();
}
let plan = plan::physical::plan_query_with_context(expr, context);
if self.plan_cache_limit > 0 {
if cache.len() >= self.plan_cache_limit {
cache.clear();
}
cache.insert(cache_key, plan.clone());
}
plan
}
}
impl exec::pipeline::PipelineData for Jetro {
fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>> {
self.get_or_promote_objvec(arr)
}
}
impl Jetro {
pub(crate) fn lazy_tape(
&self,
) -> std::result::Result<Option<&Arc<crate::data::tape::TapeData>>, EvalError> {
if let Some(result) = self.tape.get() {
return result
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")));
}
let Some(raw) = self.raw_bytes.as_ref() else {
return Ok(None);
};
let bytes: Vec<u8> = (**raw).to_vec();
let parsed = crate::data::tape::TapeData::parse(bytes).map_err(|err| err.to_string());
let _ = self.tape.set(parsed);
self.tape
.get()
.expect("tape cache initialized")
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")))
}
pub(crate) fn get_or_promote_objvec(
&self,
arr: &Arc<Vec<Val>>,
) -> Option<Arc<crate::data::value::ObjVecData>> {
let key = Arc::as_ptr(arr) as usize;
if let Ok(cache) = self.objvec_cache.lock() {
if let Some(d) = cache.get(&key) {
return Some(Arc::clone(d));
}
}
let promoted = exec::pipeline::Pipeline::try_promote_objvec_arr(arr)?;
if let Ok(mut cache) = self.objvec_cache.lock() {
cache.entry(key).or_insert_with(|| Arc::clone(&promoted));
}
Some(promoted)
}
pub(crate) fn new(document: Value) -> Self {
Self {
document,
root_val: OnceCell::new(),
objvec_cache: Default::default(),
raw_bytes: None,
tape: OnceCell::new(),
structural_index: OnceCell::new(),
vm: RefCell::new(VM::new()),
}
}
pub(crate) fn from_val_and_value(root: Val, document: Value) -> Self {
let root_val = OnceCell::new();
let _ = root_val.set(root);
Self {
document,
root_val,
objvec_cache: Default::default(),
raw_bytes: None,
tape: OnceCell::new(),
structural_index: OnceCell::new(),
vm: RefCell::new(VM::new()),
}
}
pub(crate) fn root_val_with(
&self,
keys: &crate::data::intern::KeyCache,
) -> std::result::Result<Val, EvalError> {
if let Some(root) = self.root_val.get() {
return Ok(root.clone());
}
let root = {
if let Some(tape) = self.lazy_tape()? {
Val::from_tape_data_with(keys, tape)
} else {
Val::from_value_with(keys, &self.document)
}
};
let _ = self.root_val.set(root);
Ok(self.root_val.get().expect("root val initialized").clone())
}
pub fn from_bytes(bytes: Vec<u8>) -> std::result::Result<Self, serde_json::Error> {
Ok(Self {
document: Value::Null,
root_val: OnceCell::new(),
objvec_cache: Default::default(),
raw_bytes: Some(Arc::from(bytes.into_boxed_slice())),
tape: OnceCell::new(),
structural_index: OnceCell::new(),
vm: RefCell::new(VM::new()),
})
}
pub(crate) fn with_vm<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut VM) -> R,
{
match self.vm.try_borrow_mut() {
Ok(mut vm) => f(&mut vm),
Err(_) => {
let mut vm = VM::new();
f(&mut vm)
}
}
}
pub(crate) fn raw_bytes(&self) -> Option<&[u8]> {
self.raw_bytes.as_deref()
}
pub(crate) fn lazy_structural_index(
&self,
) -> std::result::Result<Option<&Arc<jetro_experimental::StructuralIndex>>, EvalError> {
if let Some(result) = self.structural_index.get() {
return result
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")));
}
let Some(raw) = self.raw_bytes.as_ref() else {
return Ok(None);
};
let built = jetro_experimental::from_bytes_with(
raw.as_ref(),
jetro_experimental::BuildOptions::keys_only(),
)
.map(Arc::new)
.map_err(|err| err.to_string());
let _ = self.structural_index.set(built);
self.structural_index
.get()
.expect("structural index cache initialized")
.as_ref()
.map(Some)
.map_err(|err| EvalError(format!("Invalid JSON: {err}")))
}
pub(crate) fn root_val(&self) -> std::result::Result<Val, EvalError> {
if let Some(root) = self.root_val.get() {
return Ok(root.clone());
}
let root = {
if let Some(tape) = self.lazy_tape()? {
Val::from_tape_data(tape)
} else {
Val::from(&self.document)
}
};
let _ = self.root_val.set(root);
Ok(self.root_val.get().expect("root val initialized").clone())
}
#[cfg(test)]
pub(crate) fn root_val_is_materialized(&self) -> bool {
self.root_val.get().is_some()
}
#[cfg(test)]
pub(crate) fn structural_index_is_built(&self) -> bool {
self.structural_index.get().is_some()
}
#[cfg(test)]
pub(crate) fn tape_is_built(&self) -> bool {
self.tape.get().is_some()
}
#[cfg(test)]
pub(crate) fn reset_tape_materialized_subtrees(&self) {
if let Ok(Some(tape)) = self.lazy_tape() {
tape.reset_materialized_subtrees();
}
}
#[cfg(test)]
pub(crate) fn tape_materialized_subtrees(&self) -> usize {
self.lazy_tape()
.ok()
.flatten()
.map(|tape| tape.materialized_subtrees())
.unwrap_or(0)
}
pub fn collect<S: AsRef<str>>(&self, expr: S) -> std::result::Result<Value, EvalError> {
exec::router::collect_json(self, expr.as_ref())
}
}
impl From<Value> for Jetro {
fn from(v: Value) -> Self {
Self::new(v)
}
}