#![deny(unsafe_code)]
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "kind", deny_unknown_fields, rename_all = "snake_case")]
pub enum Node {
Step {
#[serde(rename = "ref")]
ref_: String,
#[serde(rename = "in")]
in_: Expr,
out: Expr,
},
Seq { children: Vec<Node> },
Branch {
cond: Expr,
#[serde(rename = "then")]
then_: Box<Node>,
#[serde(rename = "else")]
else_: Box<Node>,
},
Fanout {
items: Expr,
bind: Expr,
body: Box<Node>,
join: JoinMode,
out: Expr,
},
Loop {
counter: Expr,
cond: Expr,
body: Box<Node>,
max: u32,
},
Try {
body: Box<Node>,
catch: Box<Node>,
#[serde(default)]
err_at: Option<Expr>,
},
Assign { at: Expr, value: Expr },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JoinMode {
All,
Any,
Race,
AllSettled,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "op", deny_unknown_fields, rename_all = "snake_case")]
pub enum Expr {
Path { at: String },
Lit { value: Value },
Eq { lhs: Box<Expr>, rhs: Box<Expr> },
Ne { lhs: Box<Expr>, rhs: Box<Expr> },
Lt { lhs: Box<Expr>, rhs: Box<Expr> },
Le { lhs: Box<Expr>, rhs: Box<Expr> },
Gt { lhs: Box<Expr>, rhs: Box<Expr> },
Ge { lhs: Box<Expr>, rhs: Box<Expr> },
Not { operand: Box<Expr> },
And { operands: Vec<Expr> },
Or { operands: Vec<Expr> },
Exists { at: String },
Add { lhs: Box<Expr>, rhs: Box<Expr> },
Sub { lhs: Box<Expr>, rhs: Box<Expr> },
Mul { lhs: Box<Expr>, rhs: Box<Expr> },
Div { lhs: Box<Expr>, rhs: Box<Expr> },
Len { of: Box<Expr> },
In {
needle: Box<Expr>,
haystack: Box<Expr>,
},
}
pub trait Dispatcher {
fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
}
impl<F> Dispatcher for F
where
F: Fn(&str, Value) -> Result<Value, EvalError>,
{
fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
self(ref_, input)
}
}
#[derive(Debug, Error)]
pub enum EvalError {
#[error("path not found: {0}")]
PathNotFound(String),
#[error("invalid path syntax: {0}")]
InvalidPath(String),
#[error("branch cond must be boolean, got: {0}")]
NonBoolCond(Value),
#[error("dispatcher error for ref '{ref_}': {msg}")]
DispatcherError { ref_: String, msg: String },
}
pub trait CtxStorage: Send + Sync {
fn read(&self, path: &str) -> Result<Value, EvalError>;
fn write(&self, path: &str, value: Value) -> Result<(), EvalError>;
fn snapshot(&self) -> Value;
fn replace(&self, value: Value);
}
pub struct MemoryCtx {
inner: std::sync::Mutex<Value>,
}
impl MemoryCtx {
pub fn new(ctx: Value) -> Self {
Self {
inner: std::sync::Mutex::new(ctx),
}
}
pub fn shared(ctx: Value) -> std::sync::Arc<dyn CtxStorage> {
std::sync::Arc::new(Self::new(ctx))
}
}
impl CtxStorage for MemoryCtx {
fn read(&self, path: &str) -> Result<Value, EvalError> {
let guard = self.inner.lock().expect("ctx mutex poisoned");
read_path(path, &guard)
}
fn write(&self, path: &str, value: Value) -> Result<(), EvalError> {
let mut guard = self.inner.lock().expect("ctx mutex poisoned");
let cur = std::mem::take(&mut *guard);
let updated = write_path(
&Expr::Path {
at: path.to_string(),
},
cur,
value,
)?;
*guard = updated;
Ok(())
}
fn snapshot(&self) -> Value {
let guard = self.inner.lock().expect("ctx mutex poisoned");
guard.clone()
}
fn replace(&self, value: Value) {
let mut guard = self.inner.lock().expect("ctx mutex poisoned");
*guard = value;
}
}
fn path_str(expr: &Expr) -> Result<&str, EvalError> {
match expr {
Expr::Path { at } => Ok(at.as_str()),
_ => Err(EvalError::InvalidPath(
"expected Path expr for write target".into(),
)),
}
}
pub fn eval_with_storage<D: Dispatcher>(
node: &Node,
ctx: &dyn CtxStorage,
dispatcher: &D,
) -> Result<(), EvalError> {
match node {
Node::Step { ref_, in_, out } => {
let snap = ctx.snapshot();
let input = eval_expr(in_, &snap)?;
let output =
dispatcher
.dispatch(ref_, input)
.map_err(|e| EvalError::DispatcherError {
ref_: ref_.clone(),
msg: e.to_string(),
})?;
ctx.write(path_str(out)?, output)
}
Node::Seq { children } => {
for child in children {
eval_with_storage(child, ctx, dispatcher)?;
}
Ok(())
}
Node::Branch { cond, then_, else_ } => {
let snap = ctx.snapshot();
match eval_expr(cond, &snap)? {
Value::Bool(true) => eval_with_storage(then_, ctx, dispatcher),
Value::Bool(false) => eval_with_storage(else_, ctx, dispatcher),
other => Err(EvalError::NonBoolCond(other)),
}
}
Node::Fanout {
items,
bind,
body,
join,
out,
} => {
let snap = ctx.snapshot();
let items_val = eval_expr(items, &snap)?;
let items_arr = match items_val {
Value::Array(a) => a,
other => {
return Err(EvalError::DispatcherError {
ref_: "fanout.items".into(),
msg: format!("expected array, got {other:?}"),
})
}
};
let joined = fanout_eval_sync(bind, body, *join, &snap, items_arr, dispatcher)?;
ctx.write(path_str(out)?, joined)
}
Node::Loop {
counter,
cond,
body,
max,
} => {
let counter_path = path_str(counter)?;
ctx.write(counter_path, Value::Number(serde_json::Number::from(0u32)))?;
let mut n: u32 = 0;
loop {
if n >= *max {
break;
}
let snap = ctx.snapshot();
if !is_truthy(&eval_expr(cond, &snap)?) {
break;
}
eval_with_storage(body, ctx, dispatcher)?;
n += 1;
ctx.write(counter_path, Value::Number(serde_json::Number::from(n)))?;
}
Ok(())
}
Node::Try {
body,
catch,
err_at,
} => {
let snap_before = ctx.snapshot();
match eval_with_storage(body, ctx, dispatcher) {
Ok(()) => Ok(()),
Err(e) => {
ctx.replace(snap_before);
if let Some(at) = err_at {
ctx.write(path_str(at)?, Value::String(e.to_string()))?;
}
eval_with_storage(catch, ctx, dispatcher)
}
}
}
Node::Assign { at, value } => {
let snap = ctx.snapshot();
let v = eval_expr(value, &snap)?;
ctx.write(path_str(at)?, v)
}
}
}
fn fanout_eval_sync<D: Dispatcher>(
bind: &Expr,
body: &Node,
join: JoinMode,
base_snap: &Value,
items_arr: Vec<Value>,
dispatcher: &D,
) -> Result<Value, EvalError> {
match join {
JoinMode::All => {
let mut results = Vec::with_capacity(items_arr.len());
for item in items_arr {
let branch_ctx = write_path(bind, base_snap.clone(), item)?;
let storage = MemoryCtx::new(branch_ctx);
eval_with_storage(body, &storage, dispatcher)?;
results.push(storage.snapshot());
}
Ok(Value::Array(results))
}
JoinMode::Any => {
let mut winner: Option<Value> = None;
let mut last_err: Option<EvalError> = None;
for item in items_arr {
let branch_ctx = write_path(bind, base_snap.clone(), item)?;
let storage = MemoryCtx::new(branch_ctx);
match eval_with_storage(body, &storage, dispatcher) {
Ok(()) => {
winner = Some(storage.snapshot());
last_err = None;
break;
}
Err(e) => last_err = Some(e),
}
}
if let Some(e) = last_err {
return Err(e);
}
Ok(winner.unwrap_or(Value::Array(vec![])))
}
JoinMode::Race => {
if let Some(first) = items_arr.into_iter().next() {
let branch_ctx = write_path(bind, base_snap.clone(), first)?;
let storage = MemoryCtx::new(branch_ctx);
eval_with_storage(body, &storage, dispatcher)?;
Ok(storage.snapshot())
} else {
Ok(Value::Array(vec![]))
}
}
JoinMode::AllSettled => {
let mut records = Vec::with_capacity(items_arr.len());
for item in items_arr {
let branch_ctx = write_path(bind, base_snap.clone(), item)?;
let storage = MemoryCtx::new(branch_ctx);
match eval_with_storage(body, &storage, dispatcher) {
Ok(()) => records.push(
serde_json::json!({"status": "fulfilled", "value": storage.snapshot()}),
),
Err(e) => records
.push(serde_json::json!({"status": "rejected", "reason": e.to_string()})),
}
}
Ok(Value::Array(records))
}
}
}
pub fn eval<D: Dispatcher>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError> {
let storage = MemoryCtx::new(ctx);
eval_with_storage(node, &storage, dispatcher)?;
Ok(storage.snapshot())
}
pub fn is_truthy(v: &Value) -> bool {
match v {
Value::Null => false,
Value::Bool(b) => *b,
_ => true,
}
}
pub fn eval_expr(expr: &Expr, ctx: &Value) -> Result<Value, EvalError> {
match expr {
Expr::Lit { value } => Ok(value.clone()),
Expr::Path { at } => read_path(at, ctx),
Expr::Eq { lhs, rhs } => Ok(Value::Bool(eval_expr(lhs, ctx)? == eval_expr(rhs, ctx)?)),
Expr::Ne { lhs, rhs } => Ok(Value::Bool(eval_expr(lhs, ctx)? != eval_expr(rhs, ctx)?)),
Expr::Lt { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a < b),
Expr::Le { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a <= b),
Expr::Gt { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a > b),
Expr::Ge { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a >= b),
Expr::Not { operand } => Ok(Value::Bool(!is_truthy(&eval_expr(operand, ctx)?))),
Expr::And { operands } => {
for op in operands {
if !is_truthy(&eval_expr(op, ctx)?) {
return Ok(Value::Bool(false));
}
}
Ok(Value::Bool(true))
}
Expr::Or { operands } => {
for op in operands {
if is_truthy(&eval_expr(op, ctx)?) {
return Ok(Value::Bool(true));
}
}
Ok(Value::Bool(false))
}
Expr::Exists { at } => Ok(Value::Bool(read_path(at, ctx).is_ok())),
Expr::Add { lhs, rhs } => num_arith(lhs, rhs, ctx, "add", |a, b| Some(a + b)),
Expr::Sub { lhs, rhs } => num_arith(lhs, rhs, ctx, "sub", |a, b| Some(a - b)),
Expr::Mul { lhs, rhs } => num_arith(lhs, rhs, ctx, "mul", |a, b| Some(a * b)),
Expr::Div { lhs, rhs } => num_arith(lhs, rhs, ctx, "div", |a, b| {
if b == 0.0 {
None
} else {
Some(a / b)
}
}),
Expr::Len { of } => {
let v = eval_expr(of, ctx)?;
let n = match &v {
Value::Array(a) => a.len(),
Value::String(s) => s.chars().count(),
Value::Object(o) => o.len(),
other => {
return Err(EvalError::DispatcherError {
ref_: "expr.len".into(),
msg: format!("len: unsupported type {other:?}"),
})
}
};
Ok(Value::Number(serde_json::Number::from(n as u64)))
}
Expr::In { needle, haystack } => {
let n = eval_expr(needle, ctx)?;
let h = eval_expr(haystack, ctx)?;
match h {
Value::Array(a) => Ok(Value::Bool(a.iter().any(|e| e == &n))),
other => Err(EvalError::DispatcherError {
ref_: "expr.in".into(),
msg: format!("in: haystack must be array, got {other:?}"),
}),
}
}
}
}
fn to_f64(v: &Value, op: &str) -> Result<f64, EvalError> {
match v {
Value::Number(n) => n.as_f64().ok_or_else(|| EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: format!("non-f64-representable number: {n}"),
}),
other => Err(EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: format!("expected number, got {other:?}"),
}),
}
}
fn num_cmp<F>(lhs: &Expr, rhs: &Expr, ctx: &Value, cmp: F) -> Result<Value, EvalError>
where
F: Fn(f64, f64) -> bool,
{
let lv = eval_expr(lhs, ctx)?;
let rv = eval_expr(rhs, ctx)?;
let l = to_f64(&lv, "cmp")?;
let r = to_f64(&rv, "cmp")?;
Ok(Value::Bool(cmp(l, r)))
}
fn num_arith<F>(lhs: &Expr, rhs: &Expr, ctx: &Value, op: &str, f: F) -> Result<Value, EvalError>
where
F: Fn(f64, f64) -> Option<f64>,
{
let lv = eval_expr(lhs, ctx)?;
let rv = eval_expr(rhs, ctx)?;
let l = to_f64(&lv, op)?;
let r = to_f64(&rv, op)?;
let result = f(l, r).ok_or_else(|| EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: "arithmetic failure (e.g. division by zero)".into(),
})?;
let n = serde_json::Number::from_f64(result).ok_or_else(|| EvalError::DispatcherError {
ref_: format!("expr.{op}"),
msg: format!("result not f64-representable: {result}"),
})?;
Ok(Value::Number(n))
}
pub fn read_path(path: &str, ctx: &Value) -> Result<Value, EvalError> {
let trimmed = strip_path_prefix(path)?;
if trimmed.is_empty() {
return Ok(ctx.clone());
}
let mut cur = ctx;
for key in trimmed.split('.') {
cur = cur
.get(key)
.ok_or_else(|| EvalError::PathNotFound(path.to_string()))?;
}
Ok(cur.clone())
}
pub fn write_path(out: &Expr, ctx: Value, value: Value) -> Result<Value, EvalError> {
let path = match out {
Expr::Path { at } => at,
_ => {
return Err(EvalError::InvalidPath(
"Step.out must be a Path expr".into(),
))
}
};
let trimmed = strip_path_prefix(path)?;
let keys: Vec<&str> = trimmed.split('.').filter(|s| !s.is_empty()).collect();
if keys.is_empty() {
return Ok(value);
}
let mut root = ctx;
write_path_recursive(&mut root, &keys, value);
Ok(root)
}
fn strip_path_prefix(path: &str) -> Result<&str, EvalError> {
path.strip_prefix("$.")
.or_else(|| path.strip_prefix('$'))
.ok_or_else(|| EvalError::InvalidPath(format!("path must start with $ or $.: {}", path)))
}
fn write_path_recursive(node: &mut Value, keys: &[&str], value: Value) {
if keys.is_empty() {
*node = value;
return;
}
if !node.is_object() {
*node = Value::Object(serde_json::Map::new());
}
let obj = node.as_object_mut().expect("just initialised as object");
let key = keys[0];
if keys.len() == 1 {
obj.insert(key.to_string(), value);
} else {
let entry = obj
.entry(key.to_string())
.or_insert(Value::Object(serde_json::Map::new()));
write_path_recursive(entry, &keys[1..], value);
}
}