Skip to main content

flow_ir_core/
lib.rs

1#![deny(unsafe_code)]
2//! flow.ir Pure Rust schema + sync interpreter.
3//!
4//! 3 Node kinds (Step / Seq / Branch) + Fanout / Loop / Try + 3 Expr ops
5//! (Path / Lit / Eq) + sync `eval` + `Dispatcher` trait + Path read/write.
6//!
7//! mlua / futures / async 依存ゼロ。 async runtime + mlua binding は上流
8//! `mlua-flow-ir` crate が担当する 4 層 stack の core 層。
9//!
10//! # Quick start
11//!
12//! ```
13//! use flow_ir_core::{eval, Dispatcher, EvalError, Expr, Node};
14//! use serde_json::{json, Value};
15//!
16//! let node: Node = serde_json::from_value(json!({
17//!     "kind": "step",
18//!     "ref": "uppercase",
19//!     "in": { "op": "path", "at": "$.input" },
20//!     "out": { "op": "path", "at": "$.output" },
21//! })).unwrap();
22//!
23//! struct Fixture;
24//! impl Dispatcher for Fixture {
25//!     fn dispatch(&self, _r: &str, input: Value) -> Result<Value, EvalError> {
26//!         if let Value::String(s) = input {
27//!             Ok(Value::String(s.to_uppercase()))
28//!         } else {
29//!             Ok(input)
30//!         }
31//!     }
32//! }
33//!
34//! let out = eval(&node, json!({ "input": "hello" }), &Fixture).unwrap();
35//! assert_eq!(out, json!({ "input": "hello", "output": "HELLO" }));
36//! ```
37
38use serde::{Deserialize, Serialize};
39use serde_json::Value;
40use thiserror::Error;
41
42// ──────────────────────────────────────────────────────────────────────────
43// IR: 3 Node kinds + 3 Expr ops
44// ──────────────────────────────────────────────────────────────────────────
45
46/// flow.ir Node kind.
47///
48/// Discriminated with `kind` tag, `deny_unknown_fields` (open=false),
49/// `rename_all = "snake_case"`. Parser-side coverage: Step / Seq / Branch +
50/// Fanout (canonical schema の `fanout` Node、 4 join mode)。 残り Node kind
51/// (let / loop / call / switch / try / map / reduce / etc) は別 turn carry。
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(tag = "kind", deny_unknown_fields, rename_all = "snake_case")]
54pub enum Node {
55    /// `Step` — dispatch a referenced operation with `in` input, write result to `out`.
56    Step {
57        #[serde(rename = "ref")]
58        ref_: String,
59        #[serde(rename = "in")]
60        in_: Expr,
61        out: Expr,
62    },
63    /// `Seq` — evaluate children in order, threading the context value through.
64    Seq { children: Vec<Node> },
65    /// `Branch` — eval `cond`; if `true` run `then`, else run `else`.
66    Branch {
67        cond: Expr,
68        #[serde(rename = "then")]
69        then_: Box<Node>,
70        #[serde(rename = "else")]
71        else_: Box<Node>,
72    },
73    /// `Fanout` — eval `items` to an array, run `body` per item against a
74    /// branch-local ctx (caller ctx + item written to `bind`), join results
75    /// per `join` mode into `out`. Async parallel runner uses
76    /// `futures::future::{try_join_all|select_ok|join_all}` (executor-agnostic).
77    Fanout {
78        items: Expr,
79        bind: Expr,
80        body: Box<Node>,
81        join: JoinMode,
82        out: Expr,
83    },
84    /// `Loop` — counter を 0 から、 `cond` が truthy かつ `counter < max` の間
85    /// `body` を eval。 各 iter 後 counter を increment して `counter` path に書く。
86    /// VerdictLoop 等の retry/poll パターン primitive (canonical schema 整合)。
87    Loop {
88        counter: Expr,
89        cond: Expr,
90        body: Box<Node>,
91        max: u32,
92    },
93    /// `Try` — `body` を eval、 raise した場合 `catch` を eval。
94    /// `err_at` が Some なら catch 開始前に error message を ctx に書く。
95    Try {
96        body: Box<Node>,
97        catch: Box<Node>,
98        #[serde(default)]
99        err_at: Option<Expr>,
100    },
101    /// `Assign` — pure transform Node。 `value` Expr を ctx snapshot 上で評価し、
102    /// 結果を `at` (Path Expr) に write する。 dispatcher 不要、 副作用は
103    /// `CtxStorage.write` 1 回のみ。 `Seq` の中で Step 間の Adhoc update 表現に
104    /// 使う (= IR primitive、 Command 履歴は CtxStorage の write hook 経由で取得)。
105    Assign { at: Expr, value: Expr },
106}
107
108/// Fanout join semantics (Promise / futures combinators).
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum JoinMode {
112    /// every branch runs; out is an array of per-branch final ctx
113    /// (Promise.all / `futures::try_join_all`).
114    All,
115    /// first non-raising branch's ctx wins; all-fail raises
116    /// (Promise.any / `futures::future::select_ok`).
117    Any,
118    /// first branch to settle wins, success OR raise
119    /// (Promise.race / `futures::future::select`).
120    Race,
121    /// every branch runs, never raises; per-item record
122    /// `{status: fulfilled|rejected, value|reason}` (Promise.allSettled).
123    AllSettled,
124}
125
126/// flow.ir Expr op.
127///
128/// Discriminated with `op` tag, `deny_unknown_fields`, `rename_all = "snake_case"`.
129///
130/// Ops:
131/// - read / literal: `Path` / `Lit`
132/// - comparison: `Eq` / `Ne` / `Lt` / `Le` / `Gt` / `Ge`
133/// - boolean: `Not` / `And` / `Or`
134/// - existence: `Exists` (Path lookup that returns bool instead of erroring)
135/// - arithmetic: `Add` / `Sub` / `Mul` / `Div`
136/// - aggregate: `Len` (length of array / string / object) / `In` (membership in array)
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
138#[serde(tag = "op", deny_unknown_fields, rename_all = "snake_case")]
139pub enum Expr {
140    /// `Path` — read a value from ctx by simple `$.a.b.c` form.
141    Path { at: String },
142    /// `Lit` — literal JSON value.
143    Lit { value: Value },
144    /// `Eq` — boolean equality of two sub-expressions.
145    Eq { lhs: Box<Expr>, rhs: Box<Expr> },
146    /// `Ne` — boolean inequality.
147    Ne { lhs: Box<Expr>, rhs: Box<Expr> },
148    /// `Lt` — numeric `lhs < rhs` (both sides coerced to f64).
149    Lt { lhs: Box<Expr>, rhs: Box<Expr> },
150    /// `Le` — numeric `lhs <= rhs`.
151    Le { lhs: Box<Expr>, rhs: Box<Expr> },
152    /// `Gt` — numeric `lhs > rhs`.
153    Gt { lhs: Box<Expr>, rhs: Box<Expr> },
154    /// `Ge` — numeric `lhs >= rhs`.
155    Ge { lhs: Box<Expr>, rhs: Box<Expr> },
156    /// `Not` — boolean negation of `operand` (truthy-based; null/false → true).
157    Not { operand: Box<Expr> },
158    /// `And` — variadic boolean conjunction (short-circuit). Empty list → true.
159    And { operands: Vec<Expr> },
160    /// `Or` — variadic boolean disjunction (short-circuit). Empty list → false.
161    Or { operands: Vec<Expr> },
162    /// `Exists` — read a path; return `true` if it resolves, `false` if missing.
163    /// Distinct from `Path` (which raises `PathNotFound`) and from `IsNull`
164    /// (a present-but-null value resolves to `true` here).
165    Exists { at: String },
166    /// `Add` — numeric `lhs + rhs` (f64).
167    Add { lhs: Box<Expr>, rhs: Box<Expr> },
168    /// `Sub` — numeric `lhs - rhs`.
169    Sub { lhs: Box<Expr>, rhs: Box<Expr> },
170    /// `Mul` — numeric `lhs * rhs`.
171    Mul { lhs: Box<Expr>, rhs: Box<Expr> },
172    /// `Div` — numeric `lhs / rhs`. Division by zero raises `DispatcherError`.
173    Div { lhs: Box<Expr>, rhs: Box<Expr> },
174    /// `Len` — length of `of`: array → element count, string → char count,
175    /// object → key count. Other types raise `DispatcherError`.
176    Len { of: Box<Expr> },
177    /// `In` — `true` if `needle` equals any element of `haystack` (which must
178    /// evaluate to an array).
179    In {
180        needle: Box<Expr>,
181        haystack: Box<Expr>,
182    },
183}
184
185// ──────────────────────────────────────────────────────────────────────────
186// Dispatcher trait + EvalError
187// ──────────────────────────────────────────────────────────────────────────
188
189/// Dispatcher callback: resolves a `Step.ref` against the provided input,
190/// returns the step's raw output value.
191///
192/// Host crates (e.g. `mlua-swarm-engine`) provide concrete implementations:
193/// agent-block process spawn, mlua callback, MCP call, direct LLM, etc.
194/// `Fn(&str, Value) -> Result<Value, EvalError>` closures also implement this
195/// trait via the blanket impl below.
196pub trait Dispatcher {
197    fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
198}
199
200impl<F> Dispatcher for F
201where
202    F: Fn(&str, Value) -> Result<Value, EvalError>,
203{
204    fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
205        self(ref_, input)
206    }
207}
208
209/// Evaluation error.
210#[derive(Debug, Error)]
211pub enum EvalError {
212    #[error("path not found: {0}")]
213    PathNotFound(String),
214    #[error("invalid path syntax: {0}")]
215    InvalidPath(String),
216    #[error("branch cond must be boolean, got: {0}")]
217    NonBoolCond(Value),
218    #[error("dispatcher error for ref '{ref_}': {msg}")]
219    DispatcherError { ref_: String, msg: String },
220}
221
222// ──────────────────────────────────────────────────────────────────────────
223// CtxStorage — Backend DI for ctx state
224// ──────────────────────────────────────────────────────────────────────────
225
226/// Ctx backend trait — `eval(_with_storage)` 系が ctx state を touch する
227/// 唯一の経路。 `&self` write (interior mutability) で **走行中の Flow と
228/// 外部 task が同じ ctx を共有** できる (= dispatch().await suspend 中に外部
229/// task が `ctx.write` で State 注入 → resume 後 Step が read で観測、 という
230/// dynamic injection 経路を成立させる)。
231///
232/// Default impl は `MemoryCtx` (`Arc<Mutex<Value>>` wrapper、 既存
233/// `serde_json::Value` 直保持と挙動互換)。 consumer は typed struct / KV /
234/// 外部 store / observer wrap / event log 等を custom impl で持ち込める。
235pub trait CtxStorage: Send + Sync {
236    /// Read a single path (`$.a.b.c` 形式) from ctx.
237    fn read(&self, path: &str) -> Result<Value, EvalError>;
238    /// Write `value` to `path` (`$.a.b.c` 形式).
239    fn write(&self, path: &str, value: Value) -> Result<(), EvalError>;
240    /// Take a snapshot of the entire ctx (= Expr eval / Fanout fork で使う pure read view).
241    fn snapshot(&self) -> Value;
242    /// Replace the entire ctx with the given value (= Fanout branch restore 等).
243    fn replace(&self, value: Value);
244}
245
246/// Default `CtxStorage` impl — `Arc<Mutex<Value>>` wrapper。
247///
248/// Send + Sync かつ `&self` write OK = `Arc<MemoryCtx>` で外部 task と共有可能。
249pub struct MemoryCtx {
250    inner: std::sync::Mutex<Value>,
251}
252
253impl MemoryCtx {
254    /// Create a new MemoryCtx initialised with `ctx`.
255    pub fn new(ctx: Value) -> Self {
256        Self {
257            inner: std::sync::Mutex::new(ctx),
258        }
259    }
260
261    /// Convenience: wrap in `Arc<dyn CtxStorage>`.
262    pub fn shared(ctx: Value) -> std::sync::Arc<dyn CtxStorage> {
263        std::sync::Arc::new(Self::new(ctx))
264    }
265}
266
267impl CtxStorage for MemoryCtx {
268    fn read(&self, path: &str) -> Result<Value, EvalError> {
269        let guard = self.inner.lock().expect("ctx mutex poisoned");
270        read_path(path, &guard)
271    }
272
273    fn write(&self, path: &str, value: Value) -> Result<(), EvalError> {
274        let mut guard = self.inner.lock().expect("ctx mutex poisoned");
275        let cur = std::mem::take(&mut *guard);
276        let updated = write_path(
277            &Expr::Path {
278                at: path.to_string(),
279            },
280            cur,
281            value,
282        )?;
283        *guard = updated;
284        Ok(())
285    }
286
287    fn snapshot(&self) -> Value {
288        let guard = self.inner.lock().expect("ctx mutex poisoned");
289        guard.clone()
290    }
291
292    fn replace(&self, value: Value) {
293        let mut guard = self.inner.lock().expect("ctx mutex poisoned");
294        *guard = value;
295    }
296}
297
298/// Resolve `Path` Expr to its literal `$.a.b.c` string, or `InvalidPath` error.
299fn path_str(expr: &Expr) -> Result<&str, EvalError> {
300    match expr {
301        Expr::Path { at } => Ok(at.as_str()),
302        _ => Err(EvalError::InvalidPath(
303            "expected Path expr for write target".into(),
304        )),
305    }
306}
307
308// ──────────────────────────────────────────────────────────────────────────
309// Evaluator — storage-backed (canonical) + legacy Value-passing wrapper
310// ──────────────────────────────────────────────────────────────────────────
311
312/// Storage-backed sync evaluator — `CtxStorage` 経由で ctx を touch する正本。
313///
314/// 各 Node 評価開始時に `ctx.snapshot()` で Expr eval 用の pure view を取り、
315/// write は `ctx.write(path, value)` 経由。 これにより同じ `Arc<dyn CtxStorage>`
316/// を共有する外部 task が、 Step 間 (sync の場合は 1 Step 評価内では touch
317/// しないが) や eval 間で ctx state を変更できる。
318pub fn eval_with_storage<D: Dispatcher>(
319    node: &Node,
320    ctx: &dyn CtxStorage,
321    dispatcher: &D,
322) -> Result<(), EvalError> {
323    match node {
324        Node::Step { ref_, in_, out } => {
325            let snap = ctx.snapshot();
326            let input = eval_expr(in_, &snap)?;
327            let output =
328                dispatcher
329                    .dispatch(ref_, input)
330                    .map_err(|e| EvalError::DispatcherError {
331                        ref_: ref_.clone(),
332                        msg: e.to_string(),
333                    })?;
334            ctx.write(path_str(out)?, output)
335        }
336        Node::Seq { children } => {
337            for child in children {
338                eval_with_storage(child, ctx, dispatcher)?;
339            }
340            Ok(())
341        }
342        Node::Branch { cond, then_, else_ } => {
343            let snap = ctx.snapshot();
344            match eval_expr(cond, &snap)? {
345                Value::Bool(true) => eval_with_storage(then_, ctx, dispatcher),
346                Value::Bool(false) => eval_with_storage(else_, ctx, dispatcher),
347                other => Err(EvalError::NonBoolCond(other)),
348            }
349        }
350        Node::Fanout {
351            items,
352            bind,
353            body,
354            join,
355            out,
356        } => {
357            // Fanout fork = 各 branch を disjoint MemoryCtx に切り出して逐次
358            // (sync) evaluate、 集約結果を共有 ctx の `out` path に書く。
359            let snap = ctx.snapshot();
360            let items_val = eval_expr(items, &snap)?;
361            let items_arr = match items_val {
362                Value::Array(a) => a,
363                other => {
364                    return Err(EvalError::DispatcherError {
365                        ref_: "fanout.items".into(),
366                        msg: format!("expected array, got {other:?}"),
367                    })
368                }
369            };
370            let joined = fanout_eval_sync(bind, body, *join, &snap, items_arr, dispatcher)?;
371            ctx.write(path_str(out)?, joined)
372        }
373        Node::Loop {
374            counter,
375            cond,
376            body,
377            max,
378        } => {
379            let counter_path = path_str(counter)?;
380            ctx.write(counter_path, Value::Number(serde_json::Number::from(0u32)))?;
381            let mut n: u32 = 0;
382            loop {
383                if n >= *max {
384                    break;
385                }
386                let snap = ctx.snapshot();
387                if !is_truthy(&eval_expr(cond, &snap)?) {
388                    break;
389                }
390                eval_with_storage(body, ctx, dispatcher)?;
391                n += 1;
392                ctx.write(counter_path, Value::Number(serde_json::Number::from(n)))?;
393            }
394            Ok(())
395        }
396        Node::Try {
397            body,
398            catch,
399            err_at,
400        } => {
401            // body 失敗時の rollback 用 snapshot
402            let snap_before = ctx.snapshot();
403            match eval_with_storage(body, ctx, dispatcher) {
404                Ok(()) => Ok(()),
405                Err(e) => {
406                    // body の途中 write を破棄 (Try semantic: rollback)
407                    ctx.replace(snap_before);
408                    if let Some(at) = err_at {
409                        ctx.write(path_str(at)?, Value::String(e.to_string()))?;
410                    }
411                    eval_with_storage(catch, ctx, dispatcher)
412                }
413            }
414        }
415        Node::Assign { at, value } => {
416            let snap = ctx.snapshot();
417            let v = eval_expr(value, &snap)?;
418            ctx.write(path_str(at)?, v)
419        }
420    }
421}
422
423/// Internal: fanout per-item sync evaluator (disjoint branch ctx).
424fn fanout_eval_sync<D: Dispatcher>(
425    bind: &Expr,
426    body: &Node,
427    join: JoinMode,
428    base_snap: &Value,
429    items_arr: Vec<Value>,
430    dispatcher: &D,
431) -> Result<Value, EvalError> {
432    match join {
433        JoinMode::All => {
434            let mut results = Vec::with_capacity(items_arr.len());
435            for item in items_arr {
436                let branch_ctx = write_path(bind, base_snap.clone(), item)?;
437                let storage = MemoryCtx::new(branch_ctx);
438                eval_with_storage(body, &storage, dispatcher)?;
439                results.push(storage.snapshot());
440            }
441            Ok(Value::Array(results))
442        }
443        JoinMode::Any => {
444            let mut winner: Option<Value> = None;
445            let mut last_err: Option<EvalError> = None;
446            for item in items_arr {
447                let branch_ctx = write_path(bind, base_snap.clone(), item)?;
448                let storage = MemoryCtx::new(branch_ctx);
449                match eval_with_storage(body, &storage, dispatcher) {
450                    Ok(()) => {
451                        winner = Some(storage.snapshot());
452                        last_err = None;
453                        break;
454                    }
455                    Err(e) => last_err = Some(e),
456                }
457            }
458            if let Some(e) = last_err {
459                return Err(e);
460            }
461            Ok(winner.unwrap_or(Value::Array(vec![])))
462        }
463        JoinMode::Race => {
464            if let Some(first) = items_arr.into_iter().next() {
465                let branch_ctx = write_path(bind, base_snap.clone(), first)?;
466                let storage = MemoryCtx::new(branch_ctx);
467                eval_with_storage(body, &storage, dispatcher)?;
468                Ok(storage.snapshot())
469            } else {
470                Ok(Value::Array(vec![]))
471            }
472        }
473        JoinMode::AllSettled => {
474            let mut records = Vec::with_capacity(items_arr.len());
475            for item in items_arr {
476                let branch_ctx = write_path(bind, base_snap.clone(), item)?;
477                let storage = MemoryCtx::new(branch_ctx);
478                match eval_with_storage(body, &storage, dispatcher) {
479                    Ok(()) => records.push(
480                        serde_json::json!({"status": "fulfilled", "value": storage.snapshot()}),
481                    ),
482                    Err(e) => records
483                        .push(serde_json::json!({"status": "rejected", "reason": e.to_string()})),
484                }
485            }
486            Ok(Value::Array(records))
487        }
488    }
489}
490
491/// Legacy Value-passing sync evaluator — backward compat wrapper around
492/// `eval_with_storage` + `MemoryCtx`. `Value` を所有権で受け取り、 内部で
493/// `MemoryCtx::new(ctx)` を使って storage 版に委譲、 終了後の snapshot を返す。
494///
495/// 既存 caller (= dynamic injection を要求しない、 1-shot pure eval 用途) は
496/// 引き続きこの API で OK。 動的注入が要る場合は `eval_with_storage` を直接
497/// 呼ぶ。
498///
499/// Returns the updated context (= ctx with `Step.out` path written for each step traversed).
500pub fn eval<D: Dispatcher>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError> {
501    let storage = MemoryCtx::new(ctx);
502    eval_with_storage(node, &storage, dispatcher)?;
503    Ok(storage.snapshot())
504}
505
506/// JSON value の truthy 判定 (= flow.ir Branch cond / Loop cond で使う)。
507/// Bool は値そのまま、 null/false 以外は truthy (Lua / JS と整合)。
508pub fn is_truthy(v: &Value) -> bool {
509    match v {
510        Value::Null => false,
511        Value::Bool(b) => *b,
512        _ => true,
513    }
514}
515
516/// Evaluate an `Expr` against a context value, returning the resolved JSON value.
517pub fn eval_expr(expr: &Expr, ctx: &Value) -> Result<Value, EvalError> {
518    match expr {
519        Expr::Lit { value } => Ok(value.clone()),
520        Expr::Path { at } => read_path(at, ctx),
521        Expr::Eq { lhs, rhs } => Ok(Value::Bool(eval_expr(lhs, ctx)? == eval_expr(rhs, ctx)?)),
522        Expr::Ne { lhs, rhs } => Ok(Value::Bool(eval_expr(lhs, ctx)? != eval_expr(rhs, ctx)?)),
523        Expr::Lt { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a < b),
524        Expr::Le { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a <= b),
525        Expr::Gt { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a > b),
526        Expr::Ge { lhs, rhs } => num_cmp(lhs, rhs, ctx, |a, b| a >= b),
527        Expr::Not { operand } => Ok(Value::Bool(!is_truthy(&eval_expr(operand, ctx)?))),
528        Expr::And { operands } => {
529            for op in operands {
530                if !is_truthy(&eval_expr(op, ctx)?) {
531                    return Ok(Value::Bool(false));
532                }
533            }
534            Ok(Value::Bool(true))
535        }
536        Expr::Or { operands } => {
537            for op in operands {
538                if is_truthy(&eval_expr(op, ctx)?) {
539                    return Ok(Value::Bool(true));
540                }
541            }
542            Ok(Value::Bool(false))
543        }
544        Expr::Exists { at } => Ok(Value::Bool(read_path(at, ctx).is_ok())),
545        Expr::Add { lhs, rhs } => num_arith(lhs, rhs, ctx, "add", |a, b| Some(a + b)),
546        Expr::Sub { lhs, rhs } => num_arith(lhs, rhs, ctx, "sub", |a, b| Some(a - b)),
547        Expr::Mul { lhs, rhs } => num_arith(lhs, rhs, ctx, "mul", |a, b| Some(a * b)),
548        Expr::Div { lhs, rhs } => num_arith(lhs, rhs, ctx, "div", |a, b| {
549            if b == 0.0 {
550                None
551            } else {
552                Some(a / b)
553            }
554        }),
555        Expr::Len { of } => {
556            let v = eval_expr(of, ctx)?;
557            let n = match &v {
558                Value::Array(a) => a.len(),
559                Value::String(s) => s.chars().count(),
560                Value::Object(o) => o.len(),
561                other => {
562                    return Err(EvalError::DispatcherError {
563                        ref_: "expr.len".into(),
564                        msg: format!("len: unsupported type {other:?}"),
565                    })
566                }
567            };
568            Ok(Value::Number(serde_json::Number::from(n as u64)))
569        }
570        Expr::In { needle, haystack } => {
571            let n = eval_expr(needle, ctx)?;
572            let h = eval_expr(haystack, ctx)?;
573            match h {
574                Value::Array(a) => Ok(Value::Bool(a.iter().any(|e| e == &n))),
575                other => Err(EvalError::DispatcherError {
576                    ref_: "expr.in".into(),
577                    msg: format!("in: haystack must be array, got {other:?}"),
578                }),
579            }
580        }
581    }
582}
583
584/// Coerce a JSON value to f64 for numeric ops. Bool / null / non-number raise.
585fn to_f64(v: &Value, op: &str) -> Result<f64, EvalError> {
586    match v {
587        Value::Number(n) => n.as_f64().ok_or_else(|| EvalError::DispatcherError {
588            ref_: format!("expr.{op}"),
589            msg: format!("non-f64-representable number: {n}"),
590        }),
591        other => Err(EvalError::DispatcherError {
592            ref_: format!("expr.{op}"),
593            msg: format!("expected number, got {other:?}"),
594        }),
595    }
596}
597
598fn num_cmp<F>(lhs: &Expr, rhs: &Expr, ctx: &Value, cmp: F) -> Result<Value, EvalError>
599where
600    F: Fn(f64, f64) -> bool,
601{
602    let lv = eval_expr(lhs, ctx)?;
603    let rv = eval_expr(rhs, ctx)?;
604    let l = to_f64(&lv, "cmp")?;
605    let r = to_f64(&rv, "cmp")?;
606    Ok(Value::Bool(cmp(l, r)))
607}
608
609fn num_arith<F>(lhs: &Expr, rhs: &Expr, ctx: &Value, op: &str, f: F) -> Result<Value, EvalError>
610where
611    F: Fn(f64, f64) -> Option<f64>,
612{
613    let lv = eval_expr(lhs, ctx)?;
614    let rv = eval_expr(rhs, ctx)?;
615    let l = to_f64(&lv, op)?;
616    let r = to_f64(&rv, op)?;
617    let result = f(l, r).ok_or_else(|| EvalError::DispatcherError {
618        ref_: format!("expr.{op}"),
619        msg: "arithmetic failure (e.g. division by zero)".into(),
620    })?;
621    let n = serde_json::Number::from_f64(result).ok_or_else(|| EvalError::DispatcherError {
622        ref_: format!("expr.{op}"),
623        msg: format!("result not f64-representable: {result}"),
624    })?;
625    Ok(Value::Number(n))
626}
627
628// ──────────────────────────────────────────────────────────────────────────
629// Path helpers (simple `$.a.b.c` form, no array index in MVP)
630// ──────────────────────────────────────────────────────────────────────────
631
632/// Read a path from a JSON value. Supports simple `$.a.b.c` form.
633pub fn read_path(path: &str, ctx: &Value) -> Result<Value, EvalError> {
634    let trimmed = strip_path_prefix(path)?;
635    if trimmed.is_empty() {
636        return Ok(ctx.clone());
637    }
638    let mut cur = ctx;
639    for key in trimmed.split('.') {
640        cur = cur
641            .get(key)
642            .ok_or_else(|| EvalError::PathNotFound(path.to_string()))?;
643    }
644    Ok(cur.clone())
645}
646
647/// Write a value at the path location inside ctx, returning the updated ctx.
648/// `out` must be a `Path` Expr.
649pub fn write_path(out: &Expr, ctx: Value, value: Value) -> Result<Value, EvalError> {
650    let path = match out {
651        Expr::Path { at } => at,
652        _ => {
653            return Err(EvalError::InvalidPath(
654                "Step.out must be a Path expr".into(),
655            ))
656        }
657    };
658    let trimmed = strip_path_prefix(path)?;
659    let keys: Vec<&str> = trimmed.split('.').filter(|s| !s.is_empty()).collect();
660    if keys.is_empty() {
661        return Ok(value);
662    }
663    let mut root = ctx;
664    write_path_recursive(&mut root, &keys, value);
665    Ok(root)
666}
667
668fn strip_path_prefix(path: &str) -> Result<&str, EvalError> {
669    path.strip_prefix("$.")
670        .or_else(|| path.strip_prefix('$'))
671        .ok_or_else(|| EvalError::InvalidPath(format!("path must start with $ or $.: {}", path)))
672}
673
674fn write_path_recursive(node: &mut Value, keys: &[&str], value: Value) {
675    if keys.is_empty() {
676        *node = value;
677        return;
678    }
679    if !node.is_object() {
680        *node = Value::Object(serde_json::Map::new());
681    }
682    let obj = node.as_object_mut().expect("just initialised as object");
683    let key = keys[0];
684    if keys.len() == 1 {
685        obj.insert(key.to_string(), value);
686    } else {
687        let entry = obj
688            .entry(key.to_string())
689            .or_insert(Value::Object(serde_json::Map::new()));
690        write_path_recursive(entry, &keys[1..], value);
691    }
692}