1#![deny(unsafe_code)]
2use serde::{Deserialize, Serialize};
39use serde_json::Value;
40use thiserror::Error;
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(tag = "kind", deny_unknown_fields, rename_all = "snake_case")]
54pub enum Node {
55 Step {
57 #[serde(rename = "ref")]
58 ref_: String,
59 #[serde(rename = "in")]
60 in_: Expr,
61 out: Expr,
62 },
63 Seq { children: Vec<Node> },
65 Branch {
67 cond: Expr,
68 #[serde(rename = "then")]
69 then_: Box<Node>,
70 #[serde(rename = "else")]
71 else_: Box<Node>,
72 },
73 Fanout {
78 items: Expr,
79 bind: Expr,
80 body: Box<Node>,
81 join: JoinMode,
82 out: Expr,
83 },
84 Loop {
88 counter: Expr,
89 cond: Expr,
90 body: Box<Node>,
91 max: u32,
92 },
93 Try {
96 body: Box<Node>,
97 catch: Box<Node>,
98 #[serde(default)]
99 err_at: Option<Expr>,
100 },
101 Assign { at: Expr, value: Expr },
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum JoinMode {
112 All,
115 Any,
118 Race,
121 AllSettled,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138#[serde(tag = "op", deny_unknown_fields, rename_all = "snake_case")]
139pub enum Expr {
140 Path { at: String },
142 Lit { value: Value },
144 Eq { lhs: Box<Expr>, rhs: Box<Expr> },
146 Ne { lhs: Box<Expr>, rhs: Box<Expr> },
148 Lt { lhs: Box<Expr>, rhs: Box<Expr> },
150 Le { lhs: Box<Expr>, rhs: Box<Expr> },
152 Gt { lhs: Box<Expr>, rhs: Box<Expr> },
154 Ge { lhs: Box<Expr>, rhs: Box<Expr> },
156 Not { operand: Box<Expr> },
158 And { operands: Vec<Expr> },
160 Or { operands: Vec<Expr> },
162 Exists { at: String },
166 Add { lhs: Box<Expr>, rhs: Box<Expr> },
168 Sub { lhs: Box<Expr>, rhs: Box<Expr> },
170 Mul { lhs: Box<Expr>, rhs: Box<Expr> },
172 Div { lhs: Box<Expr>, rhs: Box<Expr> },
174 Len { of: Box<Expr> },
177 In {
180 needle: Box<Expr>,
181 haystack: Box<Expr>,
182 },
183}
184
185pub trait Dispatcher {
197 fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
198}
199
200impl<F> Dispatcher for F
201where
202 F: Fn(&str, Value) -> Result<Value, EvalError>,
203{
204 fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
205 self(ref_, input)
206 }
207}
208
209#[derive(Debug, Error)]
211pub enum EvalError {
212 #[error("path not found: {0}")]
213 PathNotFound(String),
214 #[error("invalid path syntax: {0}")]
215 InvalidPath(String),
216 #[error("branch cond must be boolean, got: {0}")]
217 NonBoolCond(Value),
218 #[error("dispatcher error for ref '{ref_}': {msg}")]
219 DispatcherError { ref_: String, msg: String },
220}
221
222pub trait CtxStorage: Send + Sync {
236 fn read(&self, path: &str) -> Result<Value, EvalError>;
238 fn write(&self, path: &str, value: Value) -> Result<(), EvalError>;
240 fn snapshot(&self) -> Value;
242 fn replace(&self, value: Value);
244}
245
246pub struct MemoryCtx {
250 inner: std::sync::Mutex<Value>,
251}
252
253impl MemoryCtx {
254 pub fn new(ctx: Value) -> Self {
256 Self {
257 inner: std::sync::Mutex::new(ctx),
258 }
259 }
260
261 pub fn shared(ctx: Value) -> std::sync::Arc<dyn CtxStorage> {
263 std::sync::Arc::new(Self::new(ctx))
264 }
265}
266
267impl CtxStorage for MemoryCtx {
268 fn read(&self, path: &str) -> Result<Value, EvalError> {
269 let guard = self.inner.lock().expect("ctx mutex poisoned");
270 read_path(path, &guard)
271 }
272
273 fn write(&self, path: &str, value: Value) -> Result<(), EvalError> {
274 let mut guard = self.inner.lock().expect("ctx mutex poisoned");
275 let cur = std::mem::take(&mut *guard);
276 let updated = write_path(
277 &Expr::Path {
278 at: path.to_string(),
279 },
280 cur,
281 value,
282 )?;
283 *guard = updated;
284 Ok(())
285 }
286
287 fn snapshot(&self) -> Value {
288 let guard = self.inner.lock().expect("ctx mutex poisoned");
289 guard.clone()
290 }
291
292 fn replace(&self, value: Value) {
293 let mut guard = self.inner.lock().expect("ctx mutex poisoned");
294 *guard = value;
295 }
296}
297
298fn path_str(expr: &Expr) -> Result<&str, EvalError> {
300 match expr {
301 Expr::Path { at } => Ok(at.as_str()),
302 _ => Err(EvalError::InvalidPath(
303 "expected Path expr for write target".into(),
304 )),
305 }
306}
307
308pub fn eval_with_storage<D: Dispatcher>(
319 node: &Node,
320 ctx: &dyn CtxStorage,
321 dispatcher: &D,
322) -> Result<(), EvalError> {
323 match node {
324 Node::Step { ref_, in_, out } => {
325 let snap = ctx.snapshot();
326 let input = eval_expr(in_, &snap)?;
327 let output =
328 dispatcher
329 .dispatch(ref_, input)
330 .map_err(|e| EvalError::DispatcherError {
331 ref_: ref_.clone(),
332 msg: e.to_string(),
333 })?;
334 ctx.write(path_str(out)?, output)
335 }
336 Node::Seq { children } => {
337 for child in children {
338 eval_with_storage(child, ctx, dispatcher)?;
339 }
340 Ok(())
341 }
342 Node::Branch { cond, then_, else_ } => {
343 let snap = ctx.snapshot();
344 match eval_expr(cond, &snap)? {
345 Value::Bool(true) => eval_with_storage(then_, ctx, dispatcher),
346 Value::Bool(false) => eval_with_storage(else_, ctx, dispatcher),
347 other => Err(EvalError::NonBoolCond(other)),
348 }
349 }
350 Node::Fanout {
351 items,
352 bind,
353 body,
354 join,
355 out,
356 } => {
357 let snap = ctx.snapshot();
360 let items_val = eval_expr(items, &snap)?;
361 let items_arr = match items_val {
362 Value::Array(a) => a,
363 other => {
364 return Err(EvalError::DispatcherError {
365 ref_: "fanout.items".into(),
366 msg: format!("expected array, got {other:?}"),
367 })
368 }
369 };
370 let joined = fanout_eval_sync(bind, body, *join, &snap, items_arr, dispatcher)?;
371 ctx.write(path_str(out)?, joined)
372 }
373 Node::Loop {
374 counter,
375 cond,
376 body,
377 max,
378 } => {
379 let counter_path = path_str(counter)?;
380 ctx.write(counter_path, Value::Number(serde_json::Number::from(0u32)))?;
381 let mut n: u32 = 0;
382 loop {
383 if n >= *max {
384 break;
385 }
386 let snap = ctx.snapshot();
387 if !is_truthy(&eval_expr(cond, &snap)?) {
388 break;
389 }
390 eval_with_storage(body, ctx, dispatcher)?;
391 n += 1;
392 ctx.write(counter_path, Value::Number(serde_json::Number::from(n)))?;
393 }
394 Ok(())
395 }
396 Node::Try {
397 body,
398 catch,
399 err_at,
400 } => {
401 let snap_before = ctx.snapshot();
403 match eval_with_storage(body, ctx, dispatcher) {
404 Ok(()) => Ok(()),
405 Err(e) => {
406 ctx.replace(snap_before);
408 if let Some(at) = err_at {
409 ctx.write(path_str(at)?, Value::String(e.to_string()))?;
410 }
411 eval_with_storage(catch, ctx, dispatcher)
412 }
413 }
414 }
415 Node::Assign { at, value } => {
416 let snap = ctx.snapshot();
417 let v = eval_expr(value, &snap)?;
418 ctx.write(path_str(at)?, v)
419 }
420 }
421}
422
423fn fanout_eval_sync<D: Dispatcher>(
425 bind: &Expr,
426 body: &Node,
427 join: JoinMode,
428 base_snap: &Value,
429 items_arr: Vec<Value>,
430 dispatcher: &D,
431) -> Result<Value, EvalError> {
432 match join {
433 JoinMode::All => {
434 let mut results = Vec::with_capacity(items_arr.len());
435 for item in items_arr {
436 let branch_ctx = write_path(bind, base_snap.clone(), item)?;
437 let storage = MemoryCtx::new(branch_ctx);
438 eval_with_storage(body, &storage, dispatcher)?;
439 results.push(storage.snapshot());
440 }
441 Ok(Value::Array(results))
442 }
443 JoinMode::Any => {
444 let mut winner: Option<Value> = None;
445 let mut last_err: Option<EvalError> = None;
446 for item in items_arr {
447 let branch_ctx = write_path(bind, base_snap.clone(), item)?;
448 let storage = MemoryCtx::new(branch_ctx);
449 match eval_with_storage(body, &storage, dispatcher) {
450 Ok(()) => {
451 winner = Some(storage.snapshot());
452 last_err = None;
453 break;
454 }
455 Err(e) => last_err = Some(e),
456 }
457 }
458 if let Some(e) = last_err {
459 return Err(e);
460 }
461 Ok(winner.unwrap_or(Value::Array(vec![])))
462 }
463 JoinMode::Race => {
464 if let Some(first) = items_arr.into_iter().next() {
465 let branch_ctx = write_path(bind, base_snap.clone(), first)?;
466 let storage = MemoryCtx::new(branch_ctx);
467 eval_with_storage(body, &storage, dispatcher)?;
468 Ok(storage.snapshot())
469 } else {
470 Ok(Value::Array(vec![]))
471 }
472 }
473 JoinMode::AllSettled => {
474 let mut records = Vec::with_capacity(items_arr.len());
475 for item in items_arr {
476 let branch_ctx = write_path(bind, base_snap.clone(), item)?;
477 let storage = MemoryCtx::new(branch_ctx);
478 match eval_with_storage(body, &storage, dispatcher) {
479 Ok(()) => records.push(
480 serde_json::json!({"status": "fulfilled", "value": storage.snapshot()}),
481 ),
482 Err(e) => records
483 .push(serde_json::json!({"status": "rejected", "reason": e.to_string()})),
484 }
485 }
486 Ok(Value::Array(records))
487 }
488 }
489}
490
491pub fn eval<D: Dispatcher>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError> {
501 let storage = MemoryCtx::new(ctx);
502 eval_with_storage(node, &storage, dispatcher)?;
503 Ok(storage.snapshot())
504}
505
506pub fn is_truthy(v: &Value) -> bool {
509 match v {
510 Value::Null => false,
511 Value::Bool(b) => *b,
512 _ => true,
513 }
514}
515
516pub fn eval_expr(expr: &Expr, ctx: &Value) -> Result<Value, EvalError> {
518 match expr {
519 Expr::Lit { value } => Ok(value.clone()),
520 Expr::Path { at } => read_path(at, ctx),
521 Expr::Eq { lhs, rhs } => Ok(Value::Bool(eval_expr(lhs, ctx)? == eval_expr(rhs, ctx)?)),
522 Expr::Ne { lhs, rhs } => Ok(Value::Bool(eval_expr(lhs, ctx)? != eval_expr(rhs, ctx)?)),
523 Expr::Lt { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a < b),
524 Expr::Le { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a <= b),
525 Expr::Gt { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a > b),
526 Expr::Ge { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a >= b),
527 Expr::Not { operand } => Ok(Value::Bool(!is_truthy(&eval_expr(operand, ctx)?))),
528 Expr::And { operands } => {
529 for op in operands {
530 if !is_truthy(&eval_expr(op, ctx)?) {
531 return Ok(Value::Bool(false));
532 }
533 }
534 Ok(Value::Bool(true))
535 }
536 Expr::Or { operands } => {
537 for op in operands {
538 if is_truthy(&eval_expr(op, ctx)?) {
539 return Ok(Value::Bool(true));
540 }
541 }
542 Ok(Value::Bool(false))
543 }
544 Expr::Exists { at } => Ok(Value::Bool(read_path(at, ctx).is_ok())),
545 Expr::Add { lhs, rhs } => num_arith(lhs, rhs, ctx, "add", |a, b| Some(a + b)),
546 Expr::Sub { lhs, rhs } => num_arith(lhs, rhs, ctx, "sub", |a, b| Some(a - b)),
547 Expr::Mul { lhs, rhs } => num_arith(lhs, rhs, ctx, "mul", |a, b| Some(a * b)),
548 Expr::Div { lhs, rhs } => num_arith(lhs, rhs, ctx, "div", |a, b| {
549 if b == 0.0 {
550 None
551 } else {
552 Some(a / b)
553 }
554 }),
555 Expr::Len { of } => {
556 let v = eval_expr(of, ctx)?;
557 let n = match &v {
558 Value::Array(a) => a.len(),
559 Value::String(s) => s.chars().count(),
560 Value::Object(o) => o.len(),
561 other => {
562 return Err(EvalError::DispatcherError {
563 ref_: "expr.len".into(),
564 msg: format!("len: unsupported type {other:?}"),
565 })
566 }
567 };
568 Ok(Value::Number(serde_json::Number::from(n as u64)))
569 }
570 Expr::In { needle, haystack } => {
571 let n = eval_expr(needle, ctx)?;
572 let h = eval_expr(haystack, ctx)?;
573 match h {
574 Value::Array(a) => Ok(Value::Bool(a.iter().any(|e| e == &n))),
575 other => Err(EvalError::DispatcherError {
576 ref_: "expr.in".into(),
577 msg: format!("in: haystack must be array, got {other:?}"),
578 }),
579 }
580 }
581 }
582}
583
584fn to_f64(v: &Value, op: &str) -> Result<f64, EvalError> {
586 match v {
587 Value::Number(n) => n.as_f64().ok_or_else(|| EvalError::DispatcherError {
588 ref_: format!("expr.{op}"),
589 msg: format!("non-f64-representable number: {n}"),
590 }),
591 other => Err(EvalError::DispatcherError {
592 ref_: format!("expr.{op}"),
593 msg: format!("expected number, got {other:?}"),
594 }),
595 }
596}
597
598fn num_cmp<F>(lhs: &Expr, rhs: &Expr, ctx: &Value, cmp: F) -> Result<Value, EvalError>
599where
600 F: Fn(f64, f64) -> bool,
601{
602 let lv = eval_expr(lhs, ctx)?;
603 let rv = eval_expr(rhs, ctx)?;
604 let l = to_f64(&lv, "cmp")?;
605 let r = to_f64(&rv, "cmp")?;
606 Ok(Value::Bool(cmp(l, r)))
607}
608
609fn num_arith<F>(lhs: &Expr, rhs: &Expr, ctx: &Value, op: &str, f: F) -> Result<Value, EvalError>
610where
611 F: Fn(f64, f64) -> Option<f64>,
612{
613 let lv = eval_expr(lhs, ctx)?;
614 let rv = eval_expr(rhs, ctx)?;
615 let l = to_f64(&lv, op)?;
616 let r = to_f64(&rv, op)?;
617 let result = f(l, r).ok_or_else(|| EvalError::DispatcherError {
618 ref_: format!("expr.{op}"),
619 msg: "arithmetic failure (e.g. division by zero)".into(),
620 })?;
621 let n = serde_json::Number::from_f64(result).ok_or_else(|| EvalError::DispatcherError {
622 ref_: format!("expr.{op}"),
623 msg: format!("result not f64-representable: {result}"),
624 })?;
625 Ok(Value::Number(n))
626}
627
628pub fn read_path(path: &str, ctx: &Value) -> Result<Value, EvalError> {
634 let trimmed = strip_path_prefix(path)?;
635 if trimmed.is_empty() {
636 return Ok(ctx.clone());
637 }
638 let mut cur = ctx;
639 for key in trimmed.split('.') {
640 cur = cur
641 .get(key)
642 .ok_or_else(|| EvalError::PathNotFound(path.to_string()))?;
643 }
644 Ok(cur.clone())
645}
646
647pub fn write_path(out: &Expr, ctx: Value, value: Value) -> Result<Value, EvalError> {
650 let path = match out {
651 Expr::Path { at } => at,
652 _ => {
653 return Err(EvalError::InvalidPath(
654 "Step.out must be a Path expr".into(),
655 ))
656 }
657 };
658 let trimmed = strip_path_prefix(path)?;
659 let keys: Vec<&str> = trimmed.split('.').filter(|s| !s.is_empty()).collect();
660 if keys.is_empty() {
661 return Ok(value);
662 }
663 let mut root = ctx;
664 write_path_recursive(&mut root, &keys, value);
665 Ok(root)
666}
667
668fn strip_path_prefix(path: &str) -> Result<&str, EvalError> {
669 path.strip_prefix("$.")
670 .or_else(|| path.strip_prefix('$'))
671 .ok_or_else(|| EvalError::InvalidPath(format!("path must start with $ or $.: {}", path)))
672}
673
674fn write_path_recursive(node: &mut Value, keys: &[&str], value: Value) {
675 if keys.is_empty() {
676 *node = value;
677 return;
678 }
679 if !node.is_object() {
680 *node = Value::Object(serde_json::Map::new());
681 }
682 let obj = node.as_object_mut().expect("just initialised as object");
683 let key = keys[0];
684 if keys.len() == 1 {
685 obj.insert(key.to_string(), value);
686 } else {
687 let entry = obj
688 .entry(key.to_string())
689 .or_insert(Value::Object(serde_json::Map::new()));
690 write_path_recursive(entry, &keys[1..], value);
691 }
692}