#![deny(unsafe_code)]
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "kind", deny_unknown_fields, rename_all = "snake_case")]
pub enum Node {
Step {
#[serde(rename = "ref")]
ref_: String,
#[serde(rename = "in")]
in_: Expr,
out: Expr,
},
Seq { children: Vec<Node> },
Branch {
cond: Expr,
#[serde(rename = "then")]
then_: Box<Node>,
#[serde(rename = "else")]
else_: Box<Node>,
},
Fanout {
items: Expr,
bind: Expr,
body: Box<Node>,
join: JoinMode,
out: Expr,
},
Loop {
counter: Expr,
cond: Expr,
body: Box<Node>,
max: u32,
},
Try {
body: Box<Node>,
catch: Box<Node>,
#[serde(default)]
err_at: Option<Expr>,
},
Assign { at: Expr, value: Expr },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JoinMode {
All,
Any,
Race,
AllSettled,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "op", deny_unknown_fields, rename_all = "snake_case")]
pub enum Expr {
Path { at: String },
Lit { value: Value },
Eq { lhs: Box<Expr>, rhs: Box<Expr> },
Ne { lhs: Box<Expr>, rhs: Box<Expr> },
Lt { lhs: Box<Expr>, rhs: Box<Expr> },
Lte { lhs: Box<Expr>, rhs: Box<Expr> },
Gt { lhs: Box<Expr>, rhs: Box<Expr> },
Gte { lhs: Box<Expr>, rhs: Box<Expr> },
Not { arg: Box<Expr> },
And { args: Vec<Expr> },
Or { args: Vec<Expr> },
Exists { arg: Box<Expr> },
Add { lhs: Box<Expr>, rhs: Box<Expr> },
Sub { lhs: Box<Expr>, rhs: Box<Expr> },
Mul { lhs: Box<Expr>, rhs: Box<Expr> },
Div { lhs: Box<Expr>, rhs: Box<Expr> },
Mod { lhs: Box<Expr>, rhs: Box<Expr> },
Len { arg: Box<Expr> },
In {
needle: Box<Expr>,
haystack: Box<Expr>,
},
CallExtern {
#[serde(rename = "ref")]
ref_: String,
args: Vec<Expr>,
},
}
pub trait Dispatcher {
fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
}
impl<F> Dispatcher for F
where
F: Fn(&str, Value) -> Result<Value, EvalError>,
{
fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
self(ref_, input)
}
}
#[derive(Debug, Error)]
pub enum EvalError {
#[error("path not found: {0}")]
PathNotFound(String),
#[error("invalid path syntax: {0}")]
InvalidPath(String),
#[error("branch cond must be boolean, got: {0}")]
NonBoolCond(Value),
#[error("dispatcher error for ref '{ref_}': {msg}")]
DispatcherError { ref_: String, msg: String },
#[error("extern error for ref '{ref_}': {msg}")]
ExternError { ref_: String, msg: String },
}
pub trait Externs {
fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError>;
}
pub struct NoExterns;
impl Externs for NoExterns {
fn call(&self, ref_: &str, _args: &[Value]) -> Result<Value, EvalError> {
Err(EvalError::ExternError {
ref_: ref_.into(),
msg: "no externs registry configured".into(),
})
}
}
pub type ExternFn = Box<dyn Fn(&[Value]) -> Result<Value, EvalError> + Send + Sync>;
#[derive(Default)]
pub struct ExternMap {
fns: std::collections::HashMap<String, ExternFn>,
}
impl ExternMap {
pub fn new() -> Self {
Self::default()
}
pub fn register<F>(&mut self, name: impl Into<String>, f: F)
where
F: Fn(&[Value]) -> Result<Value, EvalError> + Send + Sync + 'static,
{
self.fns.insert(name.into(), Box::new(f));
}
pub fn contains(&self, name: &str) -> bool {
self.fns.contains_key(name)
}
}
impl Externs for ExternMap {
fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError> {
let f = self.fns.get(ref_).ok_or_else(|| EvalError::ExternError {
ref_: ref_.into(),
msg: "not registered in externs".into(),
})?;
f(args)
}
}
pub trait CtxStorage: Send + Sync {
fn read(&self, path: &str) -> Result<Value, EvalError>;
fn write(&self, path: &str, value: Value) -> Result<(), EvalError>;
fn snapshot(&self) -> Value;
fn replace(&self, value: Value);
}
pub struct MemoryCtx {
inner: std::sync::Mutex<Value>,
}
impl MemoryCtx {
pub fn new(ctx: Value) -> Self {
Self {
inner: std::sync::Mutex::new(ctx),
}
}
pub fn shared(ctx: Value) -> std::sync::Arc<dyn CtxStorage> {
std::sync::Arc::new(Self::new(ctx))
}
}
impl CtxStorage for MemoryCtx {
fn read(&self, path: &str) -> Result<Value, EvalError> {
let guard = self.inner.lock().expect("ctx mutex poisoned");
read_path(path, &guard)
}
fn write(&self, path: &str, value: Value) -> Result<(), EvalError> {
let mut guard = self.inner.lock().expect("ctx mutex poisoned");
let cur = std::mem::take(&mut *guard);
let updated = write_path(
&Expr::Path {
at: path.to_string(),
},
cur,
value,
)?;
*guard = updated;
Ok(())
}
fn snapshot(&self) -> Value {
let guard = self.inner.lock().expect("ctx mutex poisoned");
guard.clone()
}
fn replace(&self, value: Value) {
let mut guard = self.inner.lock().expect("ctx mutex poisoned");
*guard = value;
}
}
fn path_str(expr: &Expr) -> Result<&str, EvalError> {
match expr {
Expr::Path { at } => Ok(at.as_str()),
_ => Err(EvalError::InvalidPath(
"expected Path expr for write target".into(),
)),
}
}
pub fn eval_with_storage<D: Dispatcher>(
node: &Node,
ctx: &dyn CtxStorage,
dispatcher: &D,
) -> Result<(), EvalError> {
eval_with_storage_externs(node, ctx, dispatcher, &NoExterns)
}
pub fn eval_with_storage_externs<D: Dispatcher>(
node: &Node,
ctx: &dyn CtxStorage,
dispatcher: &D,
externs: &dyn Externs,
) -> Result<(), EvalError> {
match node {
Node::Step { ref_, in_, out } => {
let snap = ctx.snapshot();
let input = eval_expr_with_externs(in_, &snap, externs)?;
let output =
dispatcher
.dispatch(ref_, input)
.map_err(|e| EvalError::DispatcherError {
ref_: ref_.clone(),
msg: e.to_string(),
})?;
ctx.write(path_str(out)?, output)
}
Node::Seq { children } => {
for child in children {
eval_with_storage_externs(child, ctx, dispatcher, externs)?;
}
Ok(())
}
Node::Branch { cond, then_, else_ } => {
let snap = ctx.snapshot();
match eval_expr_with_externs(cond, &snap, externs)? {
Value::Bool(true) => eval_with_storage_externs(then_, ctx, dispatcher, externs),
Value::Bool(false) => eval_with_storage_externs(else_, ctx, dispatcher, externs),
other => Err(EvalError::NonBoolCond(other)),
}
}
Node::Fanout {
items,
bind,
body,
join,
out,
} => {
let snap = ctx.snapshot();
let items_val = eval_expr_with_externs(items, &snap, externs)?;
let items_arr = match items_val {
Value::Array(a) => a,
other => {
return Err(EvalError::DispatcherError {
ref_: "fanout.items".into(),
msg: format!("expected array, got {other:?}"),
})
}
};
let joined =
fanout_eval_sync(bind, body, *join, &snap, items_arr, dispatcher, externs)?;
ctx.write(path_str(out)?, joined)
}
Node::Loop {
counter,
cond,
body,
max,
} => {
let counter_path = path_str(counter)?;
ctx.write(counter_path, Value::Number(serde_json::Number::from(0u32)))?;
let mut n: u32 = 0;
loop {
if n >= *max {
break;
}
let snap = ctx.snapshot();
if !is_truthy(&eval_expr_with_externs(cond, &snap, externs)?) {
break;
}
eval_with_storage_externs(body, ctx, dispatcher, externs)?;
n += 1;
ctx.write(counter_path, Value::Number(serde_json::Number::from(n)))?;
}
Ok(())
}
Node::Try {
body,
catch,
err_at,
} => {
let snap_before = ctx.snapshot();
match eval_with_storage_externs(body, ctx, dispatcher, externs) {
Ok(()) => Ok(()),
Err(e) => {
ctx.replace(snap_before);
if let Some(at) = err_at {
ctx.write(path_str(at)?, Value::String(e.to_string()))?;
}
eval_with_storage_externs(catch, ctx, dispatcher, externs)
}
}
}
Node::Assign { at, value } => {
let snap = ctx.snapshot();
let v = eval_expr_with_externs(value, &snap, externs)?;
ctx.write(path_str(at)?, v)
}
}
}
fn fanout_eval_sync<D: Dispatcher>(
bind: &Expr,
body: &Node,
join: JoinMode,
base_snap: &Value,
items_arr: Vec<Value>,
dispatcher: &D,
externs: &dyn Externs,
) -> Result<Value, EvalError> {
match join {
JoinMode::All => {
let mut results = Vec::with_capacity(items_arr.len());
for item in items_arr {
let branch_ctx = write_path(bind, base_snap.clone(), item)?;
let storage = MemoryCtx::new(branch_ctx);
eval_with_storage_externs(body, &storage, dispatcher, externs)?;
results.push(storage.snapshot());
}
Ok(Value::Array(results))
}
JoinMode::Any => {
let mut winner: Option<Value> = None;
let mut last_err: Option<EvalError> = None;
for item in items_arr {
let branch_ctx = write_path(bind, base_snap.clone(), item)?;
let storage = MemoryCtx::new(branch_ctx);
match eval_with_storage_externs(body, &storage, dispatcher, externs) {
Ok(()) => {
winner = Some(storage.snapshot());
last_err = None;
break;
}
Err(e) => last_err = Some(e),
}
}
if let Some(e) = last_err {
return Err(e);
}
Ok(winner.unwrap_or(Value::Array(vec![])))
}
JoinMode::Race => {
if let Some(first) = items_arr.into_iter().next() {
let branch_ctx = write_path(bind, base_snap.clone(), first)?;
let storage = MemoryCtx::new(branch_ctx);
eval_with_storage_externs(body, &storage, dispatcher, externs)?;
Ok(storage.snapshot())
} else {
Ok(Value::Array(vec![]))
}
}
JoinMode::AllSettled => {
let mut records = Vec::with_capacity(items_arr.len());
for item in items_arr {
let branch_ctx = write_path(bind, base_snap.clone(), item)?;
let storage = MemoryCtx::new(branch_ctx);
match eval_with_storage_externs(body, &storage, dispatcher, externs) {
Ok(()) => records.push(
serde_json::json!({"status": "fulfilled", "value": storage.snapshot()}),
),
Err(e) => records
.push(serde_json::json!({"status": "rejected", "reason": e.to_string()})),
}
}
Ok(Value::Array(records))
}
}
}
pub fn eval<D: Dispatcher>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError> {
eval_externs(node, ctx, dispatcher, &NoExterns)
}
pub fn eval_externs<D: Dispatcher>(
node: &Node,
ctx: Value,
dispatcher: &D,
externs: &dyn Externs,
) -> Result<Value, EvalError> {
let storage = MemoryCtx::new(ctx);
eval_with_storage_externs(node, &storage, dispatcher, externs)?;
Ok(storage.snapshot())
}
pub fn is_truthy(v: &Value) -> bool {
match v {
Value::Null => false,
Value::Bool(b) => *b,
_ => true,
}
}
pub fn eval_expr(expr: &Expr, ctx: &Value) -> Result<Value, EvalError> {
eval_expr_with_externs(expr, ctx, &NoExterns)
}
pub fn eval_expr_with_externs(
expr: &Expr,
ctx: &Value,
externs: &dyn Externs,
) -> Result<Value, EvalError> {
let ev = |e: &Expr| eval_expr_with_externs(e, ctx, externs);
match expr {
Expr::Lit { value } => Ok(value.clone()),
Expr::Path { at } => read_path(at, ctx),
Expr::Eq { lhs, rhs } => Ok(Value::Bool(ev(lhs)? == ev(rhs)?)),
Expr::Ne { lhs, rhs } => Ok(Value::Bool(ev(lhs)? != ev(rhs)?)),
Expr::Lt { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_lt()),
Expr::Lte { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_le()),
Expr::Gt { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_gt()),
Expr::Gte { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_ge()),
Expr::Not { arg } => Ok(Value::Bool(!is_truthy(&ev(arg)?))),
Expr::And { args } => {
for a in args {
if !is_truthy(&ev(a)?) {
return Ok(Value::Bool(false));
}
}
Ok(Value::Bool(true))
}
Expr::Or { args } => {
for a in args {
if is_truthy(&ev(a)?) {
return Ok(Value::Bool(true));
}
}
Ok(Value::Bool(false))
}
Expr::Exists { arg } => match ev(arg) {
Ok(Value::Null) => Ok(Value::Bool(false)),
Ok(_) => Ok(Value::Bool(true)),
Err(EvalError::PathNotFound(_)) => Ok(Value::Bool(false)),
Err(e) => Err(e),
},
Expr::Add { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "add", |a, b| Some(a + b)),
Expr::Sub { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "sub", |a, b| Some(a - b)),
Expr::Mul { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "mul", |a, b| Some(a * b)),
Expr::Div { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "div", |a, b| {
if b == 0.0 {
None
} else {
Some(a / b)
}
}),
Expr::Mod { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "mod", |a, b| {
if b == 0.0 {
None
} else {
Some(a - (a / b).floor() * b)
}
}),
Expr::Len { arg } => {
let v = ev(arg)?;
let n = match &v {
Value::Array(a) => a.len(),
Value::String(s) => s.chars().count(),
Value::Object(o) => o.len(),
other => {
return Err(EvalError::DispatcherError {
ref_: "expr.len".into(),
msg: format!("len: unsupported type {other:?}"),
})
}
};
Ok(Value::Number(serde_json::Number::from(n as u64)))
}
Expr::In { needle, haystack } => {
let n = ev(needle)?;
let h = ev(haystack)?;
match h {
Value::Array(a) => Ok(Value::Bool(a.iter().any(|e| e == &n))),
other => Err(EvalError::DispatcherError {
ref_: "expr.in".into(),
msg: format!("in: haystack must be array, got {other:?}"),
}),
}
}
Expr::CallExtern { ref_, args } => {
let mut vals = Vec::with_capacity(args.len());
for a in args {
vals.push(ev(a)?);
}
externs.call(ref_, &vals)
}
}
}
fn to_f64(v: &Value, op: &str) -> Result<f64, EvalError> {
match v {
Value::Number(n) => n.as_f64().ok_or_else(|| EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: format!("non-f64-representable number: {n}"),
}),
other => Err(EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: format!("expected number, got {other:?}"),
}),
}
}
fn ord_cmp<F>(lv: &Value, rv: &Value, f: F) -> Result<Value, EvalError>
where
F: Fn(std::cmp::Ordering) -> bool,
{
let ord = match (lv, rv) {
(Value::Number(_), Value::Number(_)) => {
let l = to_f64(lv, "cmp")?;
let r = to_f64(rv, "cmp")?;
l.partial_cmp(&r)
.ok_or_else(|| EvalError::DispatcherError {
ref_: "expr.cmp".into(),
msg: "non-comparable numbers (NaN)".into(),
})?
}
(Value::String(l), Value::String(r)) => l.cmp(r),
(l, r) => {
return Err(EvalError::DispatcherError {
ref_: "expr.cmp".into(),
msg: format!("cmp: both sides must be numbers or strings, got {l:?} vs {r:?}"),
})
}
};
Ok(Value::Bool(f(ord)))
}
fn num_arith<F>(lv: &Value, rv: &Value, op: &str, f: F) -> Result<Value, EvalError>
where
F: Fn(f64, f64) -> Option<f64>,
{
let l = to_f64(lv, op)?;
let r = to_f64(rv, op)?;
let result = f(l, r).ok_or_else(|| EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: "arithmetic failure (e.g. division by zero)".into(),
})?;
let n = serde_json::Number::from_f64(result).ok_or_else(|| EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: format!("result not f64-representable: {result}"),
})?;
Ok(Value::Number(n))
}
pub fn read_path(path: &str, ctx: &Value) -> Result<Value, EvalError> {
let trimmed = strip_path_prefix(path)?;
if trimmed.is_empty() {
return Ok(ctx.clone());
}
let mut cur = ctx;
for key in trimmed.split('.') {
cur = cur
.get(key)
.ok_or_else(|| EvalError::PathNotFound(path.to_string()))?;
}
Ok(cur.clone())
}
pub fn write_path(out: &Expr, ctx: Value, value: Value) -> Result<Value, EvalError> {
let path = match out {
Expr::Path { at } => at,
_ => {
return Err(EvalError::InvalidPath(
"Step.out must be a Path expr".into(),
))
}
};
let trimmed = strip_path_prefix(path)?;
let keys: Vec<&str> = trimmed.split('.').filter(|s| !s.is_empty()).collect();
if keys.is_empty() {
return Ok(value);
}
let mut root = ctx;
write_path_recursive(&mut root, &keys, value);
Ok(root)
}
fn strip_path_prefix(path: &str) -> Result<&str, EvalError> {
path.strip_prefix("$.")
.or_else(|| path.strip_prefix('$'))
.ok_or_else(|| EvalError::InvalidPath(format!("path must start with $ or $.: {}", path)))
}
fn write_path_recursive(node: &mut Value, keys: &[&str], value: Value) {
if keys.is_empty() {
*node = value;
return;
}
if !node.is_object() {
*node = Value::Object(serde_json::Map::new());
}
let obj = node.as_object_mut().expect("just initialised as object");
let key = keys[0];
if keys.len() == 1 {
obj.insert(key.to_string(), value);
} else {
let entry = obj
.entry(key.to_string())
.or_insert(Value::Object(serde_json::Map::new()));
write_path_recursive(entry, &keys[1..], value);
}
}