Skip to main content

flow_ir_core/
lib.rs

1#![deny(unsafe_code)]
2//! flow.ir Pure Rust schema + sync interpreter.
3//!
4//! Node kinds (Step / Seq / Branch / Fanout / Loop / Try / Assign) + Expr ops
5//! (canonical wire format — comparison / boolean / existence / arithmetic /
6//! aggregate / `call_extern`) + sync `eval` + `Dispatcher` trait + `Externs`
7//! registry + Path read/write.
8//!
9//! mlua / futures / async 依存ゼロ。 async runtime + mlua binding は上流
10//! `mlua-flow-ir` crate が担当する 4 層 stack の core 層。
11//!
12//! # Quick start
13//!
14//! ```
15//! use flow_ir_core::{eval, Dispatcher, EvalError, Expr, Node};
16//! use serde_json::{json, Value};
17//!
18//! let node: Node = serde_json::from_value(json!({
19//!     "kind": "step",
20//!     "ref": "uppercase",
21//!     "in": { "op": "path", "at": "$.input" },
22//!     "out": { "op": "path", "at": "$.output" },
23//! })).unwrap();
24//!
25//! struct Fixture;
26//! impl Dispatcher for Fixture {
27//!     fn dispatch(&self, _r: &str, input: Value) -> Result<Value, EvalError> {
28//!         if let Value::String(s) = input {
29//!             Ok(Value::String(s.to_uppercase()))
30//!         } else {
31//!             Ok(input)
32//!         }
33//!     }
34//! }
35//!
36//! let out = eval(&node, json!({ "input": "hello" }), &Fixture).unwrap();
37//! assert_eq!(out, json!({ "input": "hello", "output": "HELLO" }));
38//! ```
39
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use thiserror::Error;
43
44// ──────────────────────────────────────────────────────────────────────────
45// IR: 3 Node kinds + 3 Expr ops
46// ──────────────────────────────────────────────────────────────────────────
47
48/// flow.ir Node kind.
49///
50/// Discriminated with `kind` tag, `deny_unknown_fields` (open=false),
51/// `rename_all = "snake_case"`. Parser-side coverage: Step / Seq / Branch +
52/// Fanout (canonical schema の `fanout` Node、 4 join mode)。 残り Node kind
53/// (let / loop / call / switch / try / map / reduce / etc) は別 turn carry。
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
55#[serde(tag = "kind", deny_unknown_fields, rename_all = "snake_case")]
56pub enum Node {
57    /// `Step` — dispatch a referenced operation with `in` input, write result to `out`.
58    Step {
59        #[serde(rename = "ref")]
60        ref_: String,
61        #[serde(rename = "in")]
62        in_: Expr,
63        out: Expr,
64    },
65    /// `Seq` — evaluate children in order, threading the context value through.
66    Seq { children: Vec<Node> },
67    /// `Branch` — eval `cond`; if `true` run `then`, else run `else`.
68    Branch {
69        cond: Expr,
70        #[serde(rename = "then")]
71        then_: Box<Node>,
72        #[serde(rename = "else")]
73        else_: Box<Node>,
74    },
75    /// `Fanout` — eval `items` to an array, run `body` per item against a
76    /// branch-local ctx (caller ctx + item written to `bind`), join results
77    /// per `join` mode into `out`. Async parallel runner uses
78    /// `futures::future::{try_join_all|select_ok|join_all}` (executor-agnostic).
79    Fanout {
80        items: Expr,
81        bind: Expr,
82        body: Box<Node>,
83        join: JoinMode,
84        out: Expr,
85    },
86    /// `Loop` — counter を 0 から、 `cond` が truthy かつ `counter < max` の間
87    /// `body` を eval。 各 iter 後 counter を increment して `counter` path に書く。
88    /// VerdictLoop 等の retry/poll パターン primitive (canonical schema 整合)。
89    Loop {
90        counter: Expr,
91        cond: Expr,
92        body: Box<Node>,
93        max: u32,
94    },
95    /// `Try` — `body` を eval、 raise した場合 `catch` を eval。
96    /// `err_at` が Some なら catch 開始前に error message を ctx に書く。
97    Try {
98        body: Box<Node>,
99        catch: Box<Node>,
100        #[serde(default)]
101        err_at: Option<Expr>,
102    },
103    /// `Assign` — pure transform Node。 `value` Expr を ctx snapshot 上で評価し、
104    /// 結果を `at` (Path Expr) に write する。 dispatcher 不要、 副作用は
105    /// `CtxStorage.write` 1 回のみ。 `Seq` の中で Step 間の Adhoc update 表現に
106    /// 使う (= IR primitive、 Command 履歴は CtxStorage の write hook 経由で取得)。
107    Assign { at: Expr, value: Expr },
108}
109
110/// Fanout join semantics (Promise / futures combinators).
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
112#[serde(rename_all = "snake_case")]
113pub enum JoinMode {
114    /// every branch runs; out is an array of per-branch final ctx
115    /// (Promise.all / `futures::try_join_all`).
116    All,
117    /// first non-raising branch's ctx wins; all-fail raises
118    /// (Promise.any / `futures::future::select_ok`).
119    Any,
120    /// first branch to settle wins, success OR raise
121    /// (Promise.race / `futures::future::select`).
122    Race,
123    /// every branch runs, never raises; per-item record
124    /// `{status: fulfilled|rejected, value|reason}` (Promise.allSettled).
125    AllSettled,
126}
127
128/// flow.ir Expr op.
129///
130/// Discriminated with `op` tag, `deny_unknown_fields`, `rename_all = "snake_case"`.
131/// Wire format (op tag / field names) follows the canonical `flow-ir-lua`
132/// schema (`flow/ir/schema.lua`) verbatim: `gte`/`lte` (not `ge`/`le`),
133/// `args` on `and`/`or`, `arg` on `not`/`len`/`exists`.
134///
135/// Ops:
136/// - read / literal: `Path` / `Lit`
137/// - comparison: `Eq` / `Ne` / `Lt` / `Lte` / `Gt` / `Gte` (numbers or strings)
138/// - boolean: `Not` / `And` / `Or`
139/// - existence: `Exists` (truthy iff `arg` evaluates to a non-null value)
140/// - arithmetic: `Add` / `Sub` / `Mul` / `Div` / `Mod`
141/// - aggregate: `Len` (length of array / string / object) / `In` (membership in array)
142/// - hatch: `CallExtern` (host-registered pure function, resolved via `Externs`)
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144#[serde(tag = "op", deny_unknown_fields, rename_all = "snake_case")]
145pub enum Expr {
146    /// `Path` — read a value from ctx by simple `$.a.b.c` form.
147    Path { at: String },
148    /// `Lit` — literal JSON value.
149    Lit { value: Value },
150    /// `Eq` — boolean equality of two sub-expressions.
151    Eq { lhs: Box<Expr>, rhs: Box<Expr> },
152    /// `Ne` — boolean inequality.
153    Ne { lhs: Box<Expr>, rhs: Box<Expr> },
154    /// `Lt` — `lhs < rhs`. Both numbers (f64) or both strings (lexicographic),
155    /// mirroring canonical Lua `<` semantics. Mixed / other types raise.
156    Lt { lhs: Box<Expr>, rhs: Box<Expr> },
157    /// `Lte` — `lhs <= rhs` (canonical wire tag `lte`).
158    Lte { lhs: Box<Expr>, rhs: Box<Expr> },
159    /// `Gt` — `lhs > rhs`.
160    Gt { lhs: Box<Expr>, rhs: Box<Expr> },
161    /// `Gte` — `lhs >= rhs` (canonical wire tag `gte`).
162    Gte { lhs: Box<Expr>, rhs: Box<Expr> },
163    /// `Not` — boolean negation of `arg` (truthy-based; null/false → true).
164    Not { arg: Box<Expr> },
165    /// `And` — variadic boolean conjunction (short-circuit). Empty list → true.
166    And { args: Vec<Expr> },
167    /// `Or` — variadic boolean disjunction (short-circuit). Empty list → false.
168    Or { args: Vec<Expr> },
169    /// `Exists` — evaluate `arg`; `true` iff it resolves to a non-null value.
170    /// A `Path` arg that raises `PathNotFound` yields `false` (canonical
171    /// `arg ~= nil` semantics — JSON null maps to Lua nil).
172    Exists { arg: Box<Expr> },
173    /// `Add` — numeric `lhs + rhs` (f64).
174    Add { lhs: Box<Expr>, rhs: Box<Expr> },
175    /// `Sub` — numeric `lhs - rhs`.
176    Sub { lhs: Box<Expr>, rhs: Box<Expr> },
177    /// `Mul` — numeric `lhs * rhs`.
178    Mul { lhs: Box<Expr>, rhs: Box<Expr> },
179    /// `Div` — numeric `lhs / rhs`. Division by zero raises `DispatcherError`.
180    Div { lhs: Box<Expr>, rhs: Box<Expr> },
181    /// `Mod` — numeric `lhs % rhs` (Lua `%` semantics: result takes the sign
182    /// of `rhs`). Modulo by zero raises `DispatcherError`.
183    Mod { lhs: Box<Expr>, rhs: Box<Expr> },
184    /// `Len` — length of `arg`: array → element count, string → char count,
185    /// object → key count. Other types raise `DispatcherError`.
186    Len { arg: Box<Expr> },
187    /// `In` — `true` if `needle` equals any element of `haystack` (which must
188    /// evaluate to an array). Rust-side extension (not in canonical schema).
189    In {
190        needle: Box<Expr>,
191        haystack: Box<Expr>,
192    },
193    /// `CallExtern` — value-shape Hatch: resolve a host-injected pure function
194    /// by opaque key via the `Externs` registry, apply it to evaluated args,
195    /// return the value. The registered function MUST be pure (no side
196    /// effects, no flow control) — see canonical `doc/ir.md §call_extern`.
197    CallExtern {
198        #[serde(rename = "ref")]
199        ref_: String,
200        args: Vec<Expr>,
201    },
202}
203
204// ──────────────────────────────────────────────────────────────────────────
205// Dispatcher trait + EvalError
206// ──────────────────────────────────────────────────────────────────────────
207
208/// Dispatcher callback: resolves a `Step.ref` against the provided input,
209/// returns the step's raw output value.
210///
211/// Host crates (e.g. `mlua-swarm-engine`) provide concrete implementations:
212/// agent-block process spawn, mlua callback, MCP call, direct LLM, etc.
213/// `Fn(&str, Value) -> Result<Value, EvalError>` closures also implement this
214/// trait via the blanket impl below.
215pub trait Dispatcher {
216    fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
217}
218
219impl<F> Dispatcher for F
220where
221    F: Fn(&str, Value) -> Result<Value, EvalError>,
222{
223    fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
224        self(ref_, input)
225    }
226}
227
228/// Evaluation error.
229#[derive(Debug, Error)]
230pub enum EvalError {
231    #[error("path not found: {0}")]
232    PathNotFound(String),
233    #[error("invalid path syntax: {0}")]
234    InvalidPath(String),
235    #[error("branch cond must be boolean, got: {0}")]
236    NonBoolCond(Value),
237    #[error("dispatcher error for ref '{ref_}': {msg}")]
238    DispatcherError { ref_: String, msg: String },
239    #[error("extern error for ref '{ref_}': {msg}")]
240    ExternError { ref_: String, msg: String },
241}
242
243// ──────────────────────────────────────────────────────────────────────────
244// Externs — whitelist registry for `call_extern` Expr (canonical opts.externs)
245// ──────────────────────────────────────────────────────────────────────────
246
247/// Extern registry: resolves a `call_extern.ref` against evaluated args and
248/// returns the value. Mirror of canonical `opts.externs` (flow-ir-lua
249/// `interpreter.lua`): each entry MUST be a pure function — no side effects,
250/// no flow control, value-shape manipulation only.
251///
252/// Same DI pattern as [`Dispatcher`]: host crates provide concrete
253/// implementations ([`ExternMap`] for plain Rust closures, mlua bridge for
254/// Lua functions upstream).
255pub trait Externs {
256    /// Invoke the extern registered under `ref_` with already-evaluated args.
257    /// Unregistered refs raise [`EvalError::ExternError`].
258    fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError>;
259}
260
261/// Empty registry — every `call_extern` raises `ExternError` (parity with
262/// canonical "requires opts.externs" error). Used by the externs-less
263/// compat wrappers (`eval` / `eval_expr` / `eval_with_storage`).
264pub struct NoExterns;
265
266impl Externs for NoExterns {
267    fn call(&self, ref_: &str, _args: &[Value]) -> Result<Value, EvalError> {
268        Err(EvalError::ExternError {
269            ref_: ref_.into(),
270            msg: "no externs registry configured".into(),
271        })
272    }
273}
274
275/// Boxed pure extern function stored in [`ExternMap`].
276pub type ExternFn = Box<dyn Fn(&[Value]) -> Result<Value, EvalError> + Send + Sync>;
277
278/// `HashMap`-backed [`Externs`] impl for host-side Rust closures.
279///
280/// ```
281/// use flow_ir_core::{eval_expr_with_externs, EvalError, Expr, ExternMap};
282/// use serde_json::{json, Value};
283///
284/// let mut externs = ExternMap::new();
285/// externs.register("math.sqrt", |args: &[Value]| {
286///     let x = args[0].as_f64().ok_or_else(|| EvalError::ExternError {
287///         ref_: "math.sqrt".into(),
288///         msg: "expected number".into(),
289///     })?;
290///     Ok(json!(x.sqrt()))
291/// });
292///
293/// let expr: Expr = serde_json::from_value(json!({
294///     "op": "call_extern", "ref": "math.sqrt",
295///     "args": [{ "op": "lit", "value": 9.0 }],
296/// })).unwrap();
297/// let out = eval_expr_with_externs(&expr, &json!({}), &externs).unwrap();
298/// assert_eq!(out, json!(3.0));
299/// ```
300#[derive(Default)]
301pub struct ExternMap {
302    fns: std::collections::HashMap<String, ExternFn>,
303}
304
305impl ExternMap {
306    /// Create an empty registry.
307    pub fn new() -> Self {
308        Self::default()
309    }
310
311    /// Register a pure function under `name` (overwrites an existing entry).
312    pub fn register<F>(&mut self, name: impl Into<String>, f: F)
313    where
314        F: Fn(&[Value]) -> Result<Value, EvalError> + Send + Sync + 'static,
315    {
316        self.fns.insert(name.into(), Box::new(f));
317    }
318
319    /// Whether `name` is registered (compile-time whitelist check parity).
320    pub fn contains(&self, name: &str) -> bool {
321        self.fns.contains_key(name)
322    }
323}
324
325impl Externs for ExternMap {
326    fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError> {
327        let f = self.fns.get(ref_).ok_or_else(|| EvalError::ExternError {
328            ref_: ref_.into(),
329            msg: "not registered in externs".into(),
330        })?;
331        f(args)
332    }
333}
334
335// ──────────────────────────────────────────────────────────────────────────
336// CtxStorage — Backend DI for ctx state
337// ──────────────────────────────────────────────────────────────────────────
338
339/// Ctx backend trait — `eval(_with_storage)` 系が ctx state を touch する
340/// 唯一の経路。 `&self` write (interior mutability) で **走行中の Flow と
341/// 外部 task が同じ ctx を共有** できる (= dispatch().await suspend 中に外部
342/// task が `ctx.write` で State 注入 → resume 後 Step が read で観測、 という
343/// dynamic injection 経路を成立させる)。
344///
345/// Default impl は `MemoryCtx` (`Arc<Mutex<Value>>` wrapper、 既存
346/// `serde_json::Value` 直保持と挙動互換)。 consumer は typed struct / KV /
347/// 外部 store / observer wrap / event log 等を custom impl で持ち込める。
348pub trait CtxStorage: Send + Sync {
349    /// Read a single path (`$.a.b.c` 形式) from ctx.
350    fn read(&self, path: &str) -> Result<Value, EvalError>;
351    /// Write `value` to `path` (`$.a.b.c` 形式).
352    fn write(&self, path: &str, value: Value) -> Result<(), EvalError>;
353    /// Take a snapshot of the entire ctx (= Expr eval / Fanout fork で使う pure read view).
354    fn snapshot(&self) -> Value;
355    /// Replace the entire ctx with the given value (= Fanout branch restore 等).
356    fn replace(&self, value: Value);
357}
358
359/// Default `CtxStorage` impl — `Arc<Mutex<Value>>` wrapper。
360///
361/// Send + Sync かつ `&self` write OK = `Arc<MemoryCtx>` で外部 task と共有可能。
362pub struct MemoryCtx {
363    inner: std::sync::Mutex<Value>,
364}
365
366impl MemoryCtx {
367    /// Create a new MemoryCtx initialised with `ctx`.
368    pub fn new(ctx: Value) -> Self {
369        Self {
370            inner: std::sync::Mutex::new(ctx),
371        }
372    }
373
374    /// Convenience: wrap in `Arc<dyn CtxStorage>`.
375    pub fn shared(ctx: Value) -> std::sync::Arc<dyn CtxStorage> {
376        std::sync::Arc::new(Self::new(ctx))
377    }
378}
379
380impl CtxStorage for MemoryCtx {
381    fn read(&self, path: &str) -> Result<Value, EvalError> {
382        let guard = self.inner.lock().expect("ctx mutex poisoned");
383        read_path(path, &guard)
384    }
385
386    fn write(&self, path: &str, value: Value) -> Result<(), EvalError> {
387        let mut guard = self.inner.lock().expect("ctx mutex poisoned");
388        let cur = std::mem::take(&mut *guard);
389        let updated = write_path(
390            &Expr::Path {
391                at: path.to_string(),
392            },
393            cur,
394            value,
395        )?;
396        *guard = updated;
397        Ok(())
398    }
399
400    fn snapshot(&self) -> Value {
401        let guard = self.inner.lock().expect("ctx mutex poisoned");
402        guard.clone()
403    }
404
405    fn replace(&self, value: Value) {
406        let mut guard = self.inner.lock().expect("ctx mutex poisoned");
407        *guard = value;
408    }
409}
410
411/// Resolve `Path` Expr to its literal `$.a.b.c` string, or `InvalidPath` error.
412fn path_str(expr: &Expr) -> Result<&str, EvalError> {
413    match expr {
414        Expr::Path { at } => Ok(at.as_str()),
415        _ => Err(EvalError::InvalidPath(
416            "expected Path expr for write target".into(),
417        )),
418    }
419}
420
421// ──────────────────────────────────────────────────────────────────────────
422// Evaluator — storage-backed (canonical) + legacy Value-passing wrapper
423// ──────────────────────────────────────────────────────────────────────────
424
425/// Storage-backed sync evaluator — `CtxStorage` 経由で ctx を touch する正本。
426///
427/// 各 Node 評価開始時に `ctx.snapshot()` で Expr eval 用の pure view を取り、
428/// write は `ctx.write(path, value)` 経由。 これにより同じ `Arc<dyn CtxStorage>`
429/// を共有する外部 task が、 Step 間 (sync の場合は 1 Step 評価内では touch
430/// しないが) や eval 間で ctx state を変更できる。
431pub fn eval_with_storage<D: Dispatcher>(
432    node: &Node,
433    ctx: &dyn CtxStorage,
434    dispatcher: &D,
435) -> Result<(), EvalError> {
436    eval_with_storage_externs(node, ctx, dispatcher, &NoExterns)
437}
438
439/// `eval_with_storage` + externs registry for `call_extern` Expr resolution.
440pub fn eval_with_storage_externs<D: Dispatcher>(
441    node: &Node,
442    ctx: &dyn CtxStorage,
443    dispatcher: &D,
444    externs: &dyn Externs,
445) -> Result<(), EvalError> {
446    match node {
447        Node::Step { ref_, in_, out } => {
448            let snap = ctx.snapshot();
449            let input = eval_expr_with_externs(in_, &snap, externs)?;
450            let output =
451                dispatcher
452                    .dispatch(ref_, input)
453                    .map_err(|e| EvalError::DispatcherError {
454                        ref_: ref_.clone(),
455                        msg: e.to_string(),
456                    })?;
457            ctx.write(path_str(out)?, output)
458        }
459        Node::Seq { children } => {
460            for child in children {
461                eval_with_storage_externs(child, ctx, dispatcher, externs)?;
462            }
463            Ok(())
464        }
465        Node::Branch { cond, then_, else_ } => {
466            let snap = ctx.snapshot();
467            match eval_expr_with_externs(cond, &snap, externs)? {
468                Value::Bool(true) => eval_with_storage_externs(then_, ctx, dispatcher, externs),
469                Value::Bool(false) => eval_with_storage_externs(else_, ctx, dispatcher, externs),
470                other => Err(EvalError::NonBoolCond(other)),
471            }
472        }
473        Node::Fanout {
474            items,
475            bind,
476            body,
477            join,
478            out,
479        } => {
480            // Fanout fork = 各 branch を disjoint MemoryCtx に切り出して逐次
481            // (sync) evaluate、 集約結果を共有 ctx の `out` path に書く。
482            let snap = ctx.snapshot();
483            let items_val = eval_expr_with_externs(items, &snap, externs)?;
484            let items_arr = match items_val {
485                Value::Array(a) => a,
486                other => {
487                    return Err(EvalError::DispatcherError {
488                        ref_: "fanout.items".into(),
489                        msg: format!("expected array, got {other:?}"),
490                    })
491                }
492            };
493            let joined =
494                fanout_eval_sync(bind, body, *join, &snap, items_arr, dispatcher, externs)?;
495            ctx.write(path_str(out)?, joined)
496        }
497        Node::Loop {
498            counter,
499            cond,
500            body,
501            max,
502        } => {
503            let counter_path = path_str(counter)?;
504            ctx.write(counter_path, Value::Number(serde_json::Number::from(0u32)))?;
505            let mut n: u32 = 0;
506            loop {
507                if n >= *max {
508                    break;
509                }
510                let snap = ctx.snapshot();
511                if !is_truthy(&eval_expr_with_externs(cond, &snap, externs)?) {
512                    break;
513                }
514                eval_with_storage_externs(body, ctx, dispatcher, externs)?;
515                n += 1;
516                ctx.write(counter_path, Value::Number(serde_json::Number::from(n)))?;
517            }
518            Ok(())
519        }
520        Node::Try {
521            body,
522            catch,
523            err_at,
524        } => {
525            // body 失敗時の rollback 用 snapshot
526            let snap_before = ctx.snapshot();
527            match eval_with_storage_externs(body, ctx, dispatcher, externs) {
528                Ok(()) => Ok(()),
529                Err(e) => {
530                    // body の途中 write を破棄 (Try semantic: rollback)
531                    ctx.replace(snap_before);
532                    if let Some(at) = err_at {
533                        ctx.write(path_str(at)?, Value::String(e.to_string()))?;
534                    }
535                    eval_with_storage_externs(catch, ctx, dispatcher, externs)
536                }
537            }
538        }
539        Node::Assign { at, value } => {
540            let snap = ctx.snapshot();
541            let v = eval_expr_with_externs(value, &snap, externs)?;
542            ctx.write(path_str(at)?, v)
543        }
544    }
545}
546
547/// Internal: fanout per-item sync evaluator (disjoint branch ctx).
548fn fanout_eval_sync<D: Dispatcher>(
549    bind: &Expr,
550    body: &Node,
551    join: JoinMode,
552    base_snap: &Value,
553    items_arr: Vec<Value>,
554    dispatcher: &D,
555    externs: &dyn Externs,
556) -> Result<Value, EvalError> {
557    match join {
558        JoinMode::All => {
559            let mut results = Vec::with_capacity(items_arr.len());
560            for item in items_arr {
561                let branch_ctx = write_path(bind, base_snap.clone(), item)?;
562                let storage = MemoryCtx::new(branch_ctx);
563                eval_with_storage_externs(body, &storage, dispatcher, externs)?;
564                results.push(storage.snapshot());
565            }
566            Ok(Value::Array(results))
567        }
568        JoinMode::Any => {
569            let mut winner: Option<Value> = None;
570            let mut last_err: Option<EvalError> = None;
571            for item in items_arr {
572                let branch_ctx = write_path(bind, base_snap.clone(), item)?;
573                let storage = MemoryCtx::new(branch_ctx);
574                match eval_with_storage_externs(body, &storage, dispatcher, externs) {
575                    Ok(()) => {
576                        winner = Some(storage.snapshot());
577                        last_err = None;
578                        break;
579                    }
580                    Err(e) => last_err = Some(e),
581                }
582            }
583            if let Some(e) = last_err {
584                return Err(e);
585            }
586            Ok(winner.unwrap_or(Value::Array(vec![])))
587        }
588        JoinMode::Race => {
589            if let Some(first) = items_arr.into_iter().next() {
590                let branch_ctx = write_path(bind, base_snap.clone(), first)?;
591                let storage = MemoryCtx::new(branch_ctx);
592                eval_with_storage_externs(body, &storage, dispatcher, externs)?;
593                Ok(storage.snapshot())
594            } else {
595                Ok(Value::Array(vec![]))
596            }
597        }
598        JoinMode::AllSettled => {
599            let mut records = Vec::with_capacity(items_arr.len());
600            for item in items_arr {
601                let branch_ctx = write_path(bind, base_snap.clone(), item)?;
602                let storage = MemoryCtx::new(branch_ctx);
603                match eval_with_storage_externs(body, &storage, dispatcher, externs) {
604                    Ok(()) => records.push(
605                        serde_json::json!({"status": "fulfilled", "value": storage.snapshot()}),
606                    ),
607                    Err(e) => records
608                        .push(serde_json::json!({"status": "rejected", "reason": e.to_string()})),
609                }
610            }
611            Ok(Value::Array(records))
612        }
613    }
614}
615
616/// Legacy Value-passing sync evaluator — backward compat wrapper around
617/// `eval_with_storage` + `MemoryCtx`. `Value` を所有権で受け取り、 内部で
618/// `MemoryCtx::new(ctx)` を使って storage 版に委譲、 終了後の snapshot を返す。
619///
620/// 既存 caller (= dynamic injection を要求しない、 1-shot pure eval 用途) は
621/// 引き続きこの API で OK。 動的注入が要る場合は `eval_with_storage` を直接
622/// 呼ぶ。
623///
624/// Returns the updated context (= ctx with `Step.out` path written for each step traversed).
625pub fn eval<D: Dispatcher>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError> {
626    eval_externs(node, ctx, dispatcher, &NoExterns)
627}
628
629/// `eval` + externs registry for `call_extern` Expr resolution.
630pub fn eval_externs<D: Dispatcher>(
631    node: &Node,
632    ctx: Value,
633    dispatcher: &D,
634    externs: &dyn Externs,
635) -> Result<Value, EvalError> {
636    let storage = MemoryCtx::new(ctx);
637    eval_with_storage_externs(node, &storage, dispatcher, externs)?;
638    Ok(storage.snapshot())
639}
640
641/// JSON value の truthy 判定 (= flow.ir Branch cond / Loop cond で使う)。
642/// Bool は値そのまま、 null/false 以外は truthy (Lua / JS と整合)。
643pub fn is_truthy(v: &Value) -> bool {
644    match v {
645        Value::Null => false,
646        Value::Bool(b) => *b,
647        _ => true,
648    }
649}
650
651/// Evaluate an `Expr` against a context value, returning the resolved JSON
652/// value. Externs-less compat wrapper — `call_extern` raises `ExternError`.
653pub fn eval_expr(expr: &Expr, ctx: &Value) -> Result<Value, EvalError> {
654    eval_expr_with_externs(expr, ctx, &NoExterns)
655}
656
657/// `eval_expr` + externs registry for `call_extern` Expr resolution.
658pub fn eval_expr_with_externs(
659    expr: &Expr,
660    ctx: &Value,
661    externs: &dyn Externs,
662) -> Result<Value, EvalError> {
663    let ev = |e: &Expr| eval_expr_with_externs(e, ctx, externs);
664    match expr {
665        Expr::Lit { value } => Ok(value.clone()),
666        Expr::Path { at } => read_path(at, ctx),
667        Expr::Eq { lhs, rhs } => Ok(Value::Bool(ev(lhs)? == ev(rhs)?)),
668        Expr::Ne { lhs, rhs } => Ok(Value::Bool(ev(lhs)? != ev(rhs)?)),
669        Expr::Lt { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_lt()),
670        Expr::Lte { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_le()),
671        Expr::Gt { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_gt()),
672        Expr::Gte { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_ge()),
673        Expr::Not { arg } => Ok(Value::Bool(!is_truthy(&ev(arg)?))),
674        Expr::And { args } => {
675            for a in args {
676                if !is_truthy(&ev(a)?) {
677                    return Ok(Value::Bool(false));
678                }
679            }
680            Ok(Value::Bool(true))
681        }
682        Expr::Or { args } => {
683            for a in args {
684                if is_truthy(&ev(a)?) {
685                    return Ok(Value::Bool(true));
686                }
687            }
688            Ok(Value::Bool(false))
689        }
690        Expr::Exists { arg } => match ev(arg) {
691            Ok(Value::Null) => Ok(Value::Bool(false)),
692            Ok(_) => Ok(Value::Bool(true)),
693            // canonical: a path to a missing key reads as nil → exists=false
694            Err(EvalError::PathNotFound(_)) => Ok(Value::Bool(false)),
695            Err(e) => Err(e),
696        },
697        Expr::Add { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "add", |a, b| Some(a + b)),
698        Expr::Sub { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "sub", |a, b| Some(a - b)),
699        Expr::Mul { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "mul", |a, b| Some(a * b)),
700        Expr::Div { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "div", |a, b| {
701            if b == 0.0 {
702                None
703            } else {
704                Some(a / b)
705            }
706        }),
707        // Lua `%` semantics (canonical): a - floor(a/b)*b, sign follows rhs.
708        Expr::Mod { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "mod", |a, b| {
709            if b == 0.0 {
710                None
711            } else {
712                Some(a - (a / b).floor() * b)
713            }
714        }),
715        Expr::Len { arg } => {
716            let v = ev(arg)?;
717            let n = match &v {
718                Value::Array(a) => a.len(),
719                Value::String(s) => s.chars().count(),
720                Value::Object(o) => o.len(),
721                other => {
722                    return Err(EvalError::DispatcherError {
723                        ref_: "expr.len".into(),
724                        msg: format!("len: unsupported type {other:?}"),
725                    })
726                }
727            };
728            Ok(Value::Number(serde_json::Number::from(n as u64)))
729        }
730        Expr::In { needle, haystack } => {
731            let n = ev(needle)?;
732            let h = ev(haystack)?;
733            match h {
734                Value::Array(a) => Ok(Value::Bool(a.iter().any(|e| e == &n))),
735                other => Err(EvalError::DispatcherError {
736                    ref_: "expr.in".into(),
737                    msg: format!("in: haystack must be array, got {other:?}"),
738                }),
739            }
740        }
741        Expr::CallExtern { ref_, args } => {
742            let mut vals = Vec::with_capacity(args.len());
743            for a in args {
744                vals.push(ev(a)?);
745            }
746            externs.call(ref_, &vals)
747        }
748    }
749}
750
751/// Coerce a JSON value to f64 for numeric ops. Bool / null / non-number raise.
752fn to_f64(v: &Value, op: &str) -> Result<f64, EvalError> {
753    match v {
754        Value::Number(n) => n.as_f64().ok_or_else(|| EvalError::DispatcherError {
755            ref_: format!("expr.{op}"),
756            msg: format!("non-f64-representable number: {n}"),
757        }),
758        other => Err(EvalError::DispatcherError {
759            ref_: format!("expr.{op}"),
760            msg: format!("expected number, got {other:?}"),
761        }),
762    }
763}
764
765/// Ordering comparison over two evaluated values. Mirrors canonical Lua
766/// `< / <= / > / >=`: both numbers (f64) or both strings (lexicographic
767/// byte order, same as Lua's string comparison for UTF-8); anything else
768/// raises.
769fn ord_cmp<F>(lv: &Value, rv: &Value, f: F) -> Result<Value, EvalError>
770where
771    F: Fn(std::cmp::Ordering) -> bool,
772{
773    let ord = match (lv, rv) {
774        (Value::Number(_), Value::Number(_)) => {
775            let l = to_f64(lv, "cmp")?;
776            let r = to_f64(rv, "cmp")?;
777            l.partial_cmp(&r)
778                .ok_or_else(|| EvalError::DispatcherError {
779                    ref_: "expr.cmp".into(),
780                    msg: "non-comparable numbers (NaN)".into(),
781                })?
782        }
783        (Value::String(l), Value::String(r)) => l.cmp(r),
784        (l, r) => {
785            return Err(EvalError::DispatcherError {
786                ref_: "expr.cmp".into(),
787                msg: format!("cmp: both sides must be numbers or strings, got {l:?} vs {r:?}"),
788            })
789        }
790    };
791    Ok(Value::Bool(f(ord)))
792}
793
794fn num_arith<F>(lv: &Value, rv: &Value, op: &str, f: F) -> Result<Value, EvalError>
795where
796    F: Fn(f64, f64) -> Option<f64>,
797{
798    let l = to_f64(lv, op)?;
799    let r = to_f64(rv, op)?;
800    let result = f(l, r).ok_or_else(|| EvalError::DispatcherError {
801        ref_: format!("expr.{op}"),
802        msg: "arithmetic failure (e.g. division by zero)".into(),
803    })?;
804    let n = serde_json::Number::from_f64(result).ok_or_else(|| EvalError::DispatcherError {
805        ref_: format!("expr.{op}"),
806        msg: format!("result not f64-representable: {result}"),
807    })?;
808    Ok(Value::Number(n))
809}
810
811// ──────────────────────────────────────────────────────────────────────────
812// Path helpers (simple `$.a.b.c` form, no array index in MVP)
813// ──────────────────────────────────────────────────────────────────────────
814
815/// Read a path from a JSON value. Supports simple `$.a.b.c` form.
816pub fn read_path(path: &str, ctx: &Value) -> Result<Value, EvalError> {
817    let trimmed = strip_path_prefix(path)?;
818    if trimmed.is_empty() {
819        return Ok(ctx.clone());
820    }
821    let mut cur = ctx;
822    for key in trimmed.split('.') {
823        cur = cur
824            .get(key)
825            .ok_or_else(|| EvalError::PathNotFound(path.to_string()))?;
826    }
827    Ok(cur.clone())
828}
829
830/// Write a value at the path location inside ctx, returning the updated ctx.
831/// `out` must be a `Path` Expr.
832pub fn write_path(out: &Expr, ctx: Value, value: Value) -> Result<Value, EvalError> {
833    let path = match out {
834        Expr::Path { at } => at,
835        _ => {
836            return Err(EvalError::InvalidPath(
837                "Step.out must be a Path expr".into(),
838            ))
839        }
840    };
841    let trimmed = strip_path_prefix(path)?;
842    let keys: Vec<&str> = trimmed.split('.').filter(|s| !s.is_empty()).collect();
843    if keys.is_empty() {
844        return Ok(value);
845    }
846    let mut root = ctx;
847    write_path_recursive(&mut root, &keys, value);
848    Ok(root)
849}
850
851fn strip_path_prefix(path: &str) -> Result<&str, EvalError> {
852    path.strip_prefix("$.")
853        .or_else(|| path.strip_prefix('$'))
854        .ok_or_else(|| EvalError::InvalidPath(format!("path must start with $ or $.: {}", path)))
855}
856
857fn write_path_recursive(node: &mut Value, keys: &[&str], value: Value) {
858    if keys.is_empty() {
859        *node = value;
860        return;
861    }
862    if !node.is_object() {
863        *node = Value::Object(serde_json::Map::new());
864    }
865    let obj = node.as_object_mut().expect("just initialised as object");
866    let key = keys[0];
867    if keys.len() == 1 {
868        obj.insert(key.to_string(), value);
869    } else {
870        let entry = obj
871            .entry(key.to_string())
872            .or_insert(Value::Object(serde_json::Map::new()));
873        write_path_recursive(entry, &keys[1..], value);
874    }
875}