flow_ir_core/lib.rs
1#![deny(unsafe_code)]
2//! flow.ir Pure Rust schema + sync interpreter.
3//!
4//! Node kinds (Step / Seq / Branch / Fanout / Loop / Try / Assign) + Expr ops
5//! (canonical wire format — comparison / boolean / existence / arithmetic /
6//! aggregate / `call_extern`) + sync `eval` + `Dispatcher` trait + `Externs`
7//! registry + Path read/write.
8//!
9//! mlua / futures / async 依存ゼロ。 async runtime + mlua binding は上流
10//! `mlua-flow-ir` crate が担当する 4 層 stack の core 層。
11//!
12//! # Quick start
13//!
14//! ```
15//! use flow_ir_core::{eval, Dispatcher, EvalError, Expr, Node};
16//! use serde_json::{json, Value};
17//!
18//! let node: Node = serde_json::from_value(json!({
19//! "kind": "step",
20//! "ref": "uppercase",
21//! "in": { "op": "path", "at": "$.input" },
22//! "out": { "op": "path", "at": "$.output" },
23//! })).unwrap();
24//!
25//! struct Fixture;
26//! impl Dispatcher for Fixture {
27//! fn dispatch(&self, _r: &str, input: Value) -> Result<Value, EvalError> {
28//! if let Value::String(s) = input {
29//! Ok(Value::String(s.to_uppercase()))
30//! } else {
31//! Ok(input)
32//! }
33//! }
34//! }
35//!
36//! let out = eval(&node, json!({ "input": "hello" }), &Fixture).unwrap();
37//! assert_eq!(out, json!({ "input": "hello", "output": "HELLO" }));
38//! ```
39
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use thiserror::Error;
43
44// ──────────────────────────────────────────────────────────────────────────
45// IR: 3 Node kinds + 3 Expr ops
46// ──────────────────────────────────────────────────────────────────────────
47
48/// flow.ir Node kind.
49///
50/// Discriminated with `kind` tag, `deny_unknown_fields` (open=false),
51/// `rename_all = "snake_case"`. Parser-side coverage: Step / Seq / Branch +
52/// Fanout (canonical schema の `fanout` Node、 4 join mode)。 残り Node kind
53/// (let / loop / call / switch / try / map / reduce / etc) は別 turn carry。
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
55#[serde(tag = "kind", deny_unknown_fields, rename_all = "snake_case")]
56pub enum Node {
57 /// `Step` — dispatch a referenced operation with `in` input, write result to `out`.
58 Step {
59 #[serde(rename = "ref")]
60 ref_: String,
61 #[serde(rename = "in")]
62 in_: Expr,
63 out: Expr,
64 },
65 /// `Seq` — evaluate children in order, threading the context value through.
66 Seq { children: Vec<Node> },
67 /// `Branch` — eval `cond`; if `true` run `then`, else run `else`.
68 Branch {
69 cond: Expr,
70 #[serde(rename = "then")]
71 then_: Box<Node>,
72 #[serde(rename = "else")]
73 else_: Box<Node>,
74 },
75 /// `Fanout` — eval `items` to an array, run `body` per item against a
76 /// branch-local ctx (caller ctx + item written to `bind`), join results
77 /// per `join` mode into `out`. Async parallel runner uses
78 /// `futures::future::{try_join_all|select_ok|join_all}` (executor-agnostic).
79 Fanout {
80 items: Expr,
81 bind: Expr,
82 body: Box<Node>,
83 join: JoinMode,
84 out: Expr,
85 },
86 /// `Loop` — counter を 0 から、 `cond` が truthy かつ `counter < max` の間
87 /// `body` を eval。 各 iter 後 counter を increment して `counter` path に書く。
88 /// VerdictLoop 等の retry/poll パターン primitive (canonical schema 整合)。
89 Loop {
90 counter: Expr,
91 cond: Expr,
92 body: Box<Node>,
93 max: u32,
94 },
95 /// `Try` — `body` を eval、 raise した場合 `catch` を eval。
96 /// `err_at` が Some なら catch 開始前に error message を ctx に書く。
97 Try {
98 body: Box<Node>,
99 catch: Box<Node>,
100 #[serde(default)]
101 err_at: Option<Expr>,
102 },
103 /// `Assign` — pure transform Node。 `value` Expr を ctx snapshot 上で評価し、
104 /// 結果を `at` (Path Expr) に write する。 dispatcher 不要、 副作用は
105 /// `CtxStorage.write` 1 回のみ。 `Seq` の中で Step 間の Adhoc update 表現に
106 /// 使う (= IR primitive、 Command 履歴は CtxStorage の write hook 経由で取得)。
107 Assign { at: Expr, value: Expr },
108}
109
110/// Fanout join semantics (Promise / futures combinators).
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
112#[serde(rename_all = "snake_case")]
113pub enum JoinMode {
114 /// every branch runs; out is an array of per-branch final ctx
115 /// (Promise.all / `futures::try_join_all`).
116 All,
117 /// first non-raising branch's ctx wins; all-fail raises
118 /// (Promise.any / `futures::future::select_ok`).
119 Any,
120 /// first branch to settle wins, success OR raise
121 /// (Promise.race / `futures::future::select`).
122 Race,
123 /// every branch runs, never raises; per-item record
124 /// `{status: fulfilled|rejected, value|reason}` (Promise.allSettled).
125 AllSettled,
126}
127
128/// flow.ir Expr op.
129///
130/// Discriminated with `op` tag, `deny_unknown_fields`, `rename_all = "snake_case"`.
131/// Wire format (op tag / field names) follows the canonical `flow-ir-lua`
132/// schema (`flow/ir/schema.lua`) verbatim: `gte`/`lte` (not `ge`/`le`),
133/// `args` on `and`/`or`, `arg` on `not`/`len`/`exists`.
134///
135/// Ops:
136/// - read / literal: `Path` / `Lit`
137/// - comparison: `Eq` / `Ne` / `Lt` / `Lte` / `Gt` / `Gte` (numbers or strings)
138/// - boolean: `Not` / `And` / `Or`
139/// - existence: `Exists` (truthy iff `arg` evaluates to a non-null value)
140/// - arithmetic: `Add` / `Sub` / `Mul` / `Div` / `Mod`
141/// - aggregate: `Len` (length of array / string / object) / `In` (membership in array)
142/// - hatch: `CallExtern` (host-registered pure function, resolved via `Externs`)
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144#[serde(tag = "op", deny_unknown_fields, rename_all = "snake_case")]
145pub enum Expr {
146 /// `Path` — read a value from ctx by simple `$.a.b.c` form.
147 Path { at: String },
148 /// `Lit` — literal JSON value.
149 Lit { value: Value },
150 /// `Eq` — boolean equality of two sub-expressions.
151 Eq { lhs: Box<Expr>, rhs: Box<Expr> },
152 /// `Ne` — boolean inequality.
153 Ne { lhs: Box<Expr>, rhs: Box<Expr> },
154 /// `Lt` — `lhs < rhs`. Both numbers (f64) or both strings (lexicographic),
155 /// mirroring canonical Lua `<` semantics. Mixed / other types raise.
156 Lt { lhs: Box<Expr>, rhs: Box<Expr> },
157 /// `Lte` — `lhs <= rhs` (canonical wire tag `lte`).
158 Lte { lhs: Box<Expr>, rhs: Box<Expr> },
159 /// `Gt` — `lhs > rhs`.
160 Gt { lhs: Box<Expr>, rhs: Box<Expr> },
161 /// `Gte` — `lhs >= rhs` (canonical wire tag `gte`).
162 Gte { lhs: Box<Expr>, rhs: Box<Expr> },
163 /// `Not` — boolean negation of `arg` (truthy-based; null/false → true).
164 Not { arg: Box<Expr> },
165 /// `And` — variadic boolean conjunction (short-circuit). Empty list → true.
166 And { args: Vec<Expr> },
167 /// `Or` — variadic boolean disjunction (short-circuit). Empty list → false.
168 Or { args: Vec<Expr> },
169 /// `Exists` — evaluate `arg`; `true` iff it resolves to a non-null value.
170 /// A `Path` arg that raises `PathNotFound` yields `false` (canonical
171 /// `arg ~= nil` semantics — JSON null maps to Lua nil).
172 Exists { arg: Box<Expr> },
173 /// `Add` — numeric `lhs + rhs` (f64).
174 Add { lhs: Box<Expr>, rhs: Box<Expr> },
175 /// `Sub` — numeric `lhs - rhs`.
176 Sub { lhs: Box<Expr>, rhs: Box<Expr> },
177 /// `Mul` — numeric `lhs * rhs`.
178 Mul { lhs: Box<Expr>, rhs: Box<Expr> },
179 /// `Div` — numeric `lhs / rhs`. Division by zero raises `DispatcherError`.
180 Div { lhs: Box<Expr>, rhs: Box<Expr> },
181 /// `Mod` — numeric `lhs % rhs` (Lua `%` semantics: result takes the sign
182 /// of `rhs`). Modulo by zero raises `DispatcherError`.
183 Mod { lhs: Box<Expr>, rhs: Box<Expr> },
184 /// `Len` — length of `arg`: array → element count, string → char count,
185 /// object → key count. Other types raise `DispatcherError`.
186 Len { arg: Box<Expr> },
187 /// `In` — `true` if `needle` equals any element of `haystack` (which must
188 /// evaluate to an array). Rust-side extension (not in canonical schema).
189 In {
190 needle: Box<Expr>,
191 haystack: Box<Expr>,
192 },
193 /// `CallExtern` — value-shape Hatch: resolve a host-injected pure function
194 /// by opaque key via the `Externs` registry, apply it to evaluated args,
195 /// return the value. The registered function MUST be pure (no side
196 /// effects, no flow control) — see canonical `doc/ir.md §call_extern`.
197 CallExtern {
198 #[serde(rename = "ref")]
199 ref_: String,
200 args: Vec<Expr>,
201 },
202}
203
204// ──────────────────────────────────────────────────────────────────────────
205// Dispatcher trait + EvalError
206// ──────────────────────────────────────────────────────────────────────────
207
208/// Dispatcher callback: resolves a `Step.ref` against the provided input,
209/// returns the step's raw output value.
210///
211/// Host crates (e.g. `mlua-swarm-engine`) provide concrete implementations:
212/// agent-block process spawn, mlua callback, MCP call, direct LLM, etc.
213/// `Fn(&str, Value) -> Result<Value, EvalError>` closures also implement this
214/// trait via the blanket impl below.
215pub trait Dispatcher {
216 fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
217}
218
219impl<F> Dispatcher for F
220where
221 F: Fn(&str, Value) -> Result<Value, EvalError>,
222{
223 fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
224 self(ref_, input)
225 }
226}
227
228/// Evaluation error.
229#[derive(Debug, Error)]
230pub enum EvalError {
231 #[error("path not found: {0}")]
232 PathNotFound(String),
233 #[error("invalid path syntax: {0}")]
234 InvalidPath(String),
235 #[error("branch cond must be boolean, got: {0}")]
236 NonBoolCond(Value),
237 #[error("dispatcher error for ref '{ref_}': {msg}")]
238 DispatcherError { ref_: String, msg: String },
239 #[error("extern error for ref '{ref_}': {msg}")]
240 ExternError { ref_: String, msg: String },
241}
242
243// ──────────────────────────────────────────────────────────────────────────
244// Externs — whitelist registry for `call_extern` Expr (canonical opts.externs)
245// ──────────────────────────────────────────────────────────────────────────
246
247/// Extern registry: resolves a `call_extern.ref` against evaluated args and
248/// returns the value. Mirror of canonical `opts.externs` (flow-ir-lua
249/// `interpreter.lua`): each entry MUST be a pure function — no side effects,
250/// no flow control, value-shape manipulation only.
251///
252/// Same DI pattern as [`Dispatcher`]: host crates provide concrete
253/// implementations ([`ExternMap`] for plain Rust closures, mlua bridge for
254/// Lua functions upstream).
255pub trait Externs {
256 /// Invoke the extern registered under `ref_` with already-evaluated args.
257 /// Unregistered refs raise [`EvalError::ExternError`].
258 fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError>;
259}
260
261/// Empty registry — every `call_extern` raises `ExternError` (parity with
262/// canonical "requires opts.externs" error). Used by the externs-less
263/// compat wrappers (`eval` / `eval_expr` / `eval_with_storage`).
264pub struct NoExterns;
265
266impl Externs for NoExterns {
267 fn call(&self, ref_: &str, _args: &[Value]) -> Result<Value, EvalError> {
268 Err(EvalError::ExternError {
269 ref_: ref_.into(),
270 msg: "no externs registry configured".into(),
271 })
272 }
273}
274
275/// Boxed pure extern function stored in [`ExternMap`].
276pub type ExternFn = Box<dyn Fn(&[Value]) -> Result<Value, EvalError> + Send + Sync>;
277
278/// `HashMap`-backed [`Externs`] impl for host-side Rust closures.
279///
280/// ```
281/// use flow_ir_core::{eval_expr_with_externs, EvalError, Expr, ExternMap};
282/// use serde_json::{json, Value};
283///
284/// let mut externs = ExternMap::new();
285/// externs.register("math.sqrt", |args: &[Value]| {
286/// let x = args[0].as_f64().ok_or_else(|| EvalError::ExternError {
287/// ref_: "math.sqrt".into(),
288/// msg: "expected number".into(),
289/// })?;
290/// Ok(json!(x.sqrt()))
291/// });
292///
293/// let expr: Expr = serde_json::from_value(json!({
294/// "op": "call_extern", "ref": "math.sqrt",
295/// "args": [{ "op": "lit", "value": 9.0 }],
296/// })).unwrap();
297/// let out = eval_expr_with_externs(&expr, &json!({}), &externs).unwrap();
298/// assert_eq!(out, json!(3.0));
299/// ```
300#[derive(Default)]
301pub struct ExternMap {
302 fns: std::collections::HashMap<String, ExternFn>,
303}
304
305impl ExternMap {
306 /// Create an empty registry.
307 pub fn new() -> Self {
308 Self::default()
309 }
310
311 /// Register a pure function under `name` (overwrites an existing entry).
312 pub fn register<F>(&mut self, name: impl Into<String>, f: F)
313 where
314 F: Fn(&[Value]) -> Result<Value, EvalError> + Send + Sync + 'static,
315 {
316 self.fns.insert(name.into(), Box::new(f));
317 }
318
319 /// Whether `name` is registered (compile-time whitelist check parity).
320 pub fn contains(&self, name: &str) -> bool {
321 self.fns.contains_key(name)
322 }
323}
324
325impl Externs for ExternMap {
326 fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError> {
327 let f = self.fns.get(ref_).ok_or_else(|| EvalError::ExternError {
328 ref_: ref_.into(),
329 msg: "not registered in externs".into(),
330 })?;
331 f(args)
332 }
333}
334
335// ──────────────────────────────────────────────────────────────────────────
336// CtxStorage — Backend DI for ctx state
337// ──────────────────────────────────────────────────────────────────────────
338
339/// Ctx backend trait — `eval(_with_storage)` 系が ctx state を touch する
340/// 唯一の経路。 `&self` write (interior mutability) で **走行中の Flow と
341/// 外部 task が同じ ctx を共有** できる (= dispatch().await suspend 中に外部
342/// task が `ctx.write` で State 注入 → resume 後 Step が read で観測、 という
343/// dynamic injection 経路を成立させる)。
344///
345/// Default impl は `MemoryCtx` (`Arc<Mutex<Value>>` wrapper、 既存
346/// `serde_json::Value` 直保持と挙動互換)。 consumer は typed struct / KV /
347/// 外部 store / observer wrap / event log 等を custom impl で持ち込める。
348pub trait CtxStorage: Send + Sync {
349 /// Read a single path (`$.a.b.c` 形式) from ctx.
350 fn read(&self, path: &str) -> Result<Value, EvalError>;
351 /// Write `value` to `path` (`$.a.b.c` 形式).
352 fn write(&self, path: &str, value: Value) -> Result<(), EvalError>;
353 /// Take a snapshot of the entire ctx (= Expr eval / Fanout fork で使う pure read view).
354 fn snapshot(&self) -> Value;
355 /// Replace the entire ctx with the given value (= Fanout branch restore 等).
356 fn replace(&self, value: Value);
357}
358
359/// Default `CtxStorage` impl — `Arc<Mutex<Value>>` wrapper。
360///
361/// Send + Sync かつ `&self` write OK = `Arc<MemoryCtx>` で外部 task と共有可能。
362pub struct MemoryCtx {
363 inner: std::sync::Mutex<Value>,
364}
365
366impl MemoryCtx {
367 /// Create a new MemoryCtx initialised with `ctx`.
368 pub fn new(ctx: Value) -> Self {
369 Self {
370 inner: std::sync::Mutex::new(ctx),
371 }
372 }
373
374 /// Convenience: wrap in `Arc<dyn CtxStorage>`.
375 pub fn shared(ctx: Value) -> std::sync::Arc<dyn CtxStorage> {
376 std::sync::Arc::new(Self::new(ctx))
377 }
378}
379
380impl CtxStorage for MemoryCtx {
381 fn read(&self, path: &str) -> Result<Value, EvalError> {
382 let guard = self.inner.lock().expect("ctx mutex poisoned");
383 read_path(path, &guard)
384 }
385
386 fn write(&self, path: &str, value: Value) -> Result<(), EvalError> {
387 let mut guard = self.inner.lock().expect("ctx mutex poisoned");
388 let cur = std::mem::take(&mut *guard);
389 let updated = write_path(
390 &Expr::Path {
391 at: path.to_string(),
392 },
393 cur,
394 value,
395 )?;
396 *guard = updated;
397 Ok(())
398 }
399
400 fn snapshot(&self) -> Value {
401 let guard = self.inner.lock().expect("ctx mutex poisoned");
402 guard.clone()
403 }
404
405 fn replace(&self, value: Value) {
406 let mut guard = self.inner.lock().expect("ctx mutex poisoned");
407 *guard = value;
408 }
409}
410
411/// Resolve `Path` Expr to its literal `$.a.b.c` string, or `InvalidPath` error.
412fn path_str(expr: &Expr) -> Result<&str, EvalError> {
413 match expr {
414 Expr::Path { at } => Ok(at.as_str()),
415 _ => Err(EvalError::InvalidPath(
416 "expected Path expr for write target".into(),
417 )),
418 }
419}
420
421// ──────────────────────────────────────────────────────────────────────────
422// Evaluator — storage-backed (canonical) + legacy Value-passing wrapper
423// ──────────────────────────────────────────────────────────────────────────
424
425/// Storage-backed sync evaluator — `CtxStorage` 経由で ctx を touch する正本。
426///
427/// 各 Node 評価開始時に `ctx.snapshot()` で Expr eval 用の pure view を取り、
428/// write は `ctx.write(path, value)` 経由。 これにより同じ `Arc<dyn CtxStorage>`
429/// を共有する外部 task が、 Step 間 (sync の場合は 1 Step 評価内では touch
430/// しないが) や eval 間で ctx state を変更できる。
431pub fn eval_with_storage<D: Dispatcher>(
432 node: &Node,
433 ctx: &dyn CtxStorage,
434 dispatcher: &D,
435) -> Result<(), EvalError> {
436 eval_with_storage_externs(node, ctx, dispatcher, &NoExterns)
437}
438
439/// `eval_with_storage` + externs registry for `call_extern` Expr resolution.
440pub fn eval_with_storage_externs<D: Dispatcher>(
441 node: &Node,
442 ctx: &dyn CtxStorage,
443 dispatcher: &D,
444 externs: &dyn Externs,
445) -> Result<(), EvalError> {
446 match node {
447 Node::Step { ref_, in_, out } => {
448 let snap = ctx.snapshot();
449 let input = eval_expr_with_externs(in_, &snap, externs)?;
450 let output =
451 dispatcher
452 .dispatch(ref_, input)
453 .map_err(|e| EvalError::DispatcherError {
454 ref_: ref_.clone(),
455 msg: e.to_string(),
456 })?;
457 ctx.write(path_str(out)?, output)
458 }
459 Node::Seq { children } => {
460 for child in children {
461 eval_with_storage_externs(child, ctx, dispatcher, externs)?;
462 }
463 Ok(())
464 }
465 Node::Branch { cond, then_, else_ } => {
466 let snap = ctx.snapshot();
467 match eval_expr_with_externs(cond, &snap, externs)? {
468 Value::Bool(true) => eval_with_storage_externs(then_, ctx, dispatcher, externs),
469 Value::Bool(false) => eval_with_storage_externs(else_, ctx, dispatcher, externs),
470 other => Err(EvalError::NonBoolCond(other)),
471 }
472 }
473 Node::Fanout {
474 items,
475 bind,
476 body,
477 join,
478 out,
479 } => {
480 // Fanout fork = 各 branch を disjoint MemoryCtx に切り出して逐次
481 // (sync) evaluate、 集約結果を共有 ctx の `out` path に書く。
482 let snap = ctx.snapshot();
483 let items_val = eval_expr_with_externs(items, &snap, externs)?;
484 let items_arr = match items_val {
485 Value::Array(a) => a,
486 other => {
487 return Err(EvalError::DispatcherError {
488 ref_: "fanout.items".into(),
489 msg: format!("expected array, got {other:?}"),
490 })
491 }
492 };
493 let joined =
494 fanout_eval_sync(bind, body, *join, &snap, items_arr, dispatcher, externs)?;
495 ctx.write(path_str(out)?, joined)
496 }
497 Node::Loop {
498 counter,
499 cond,
500 body,
501 max,
502 } => {
503 let counter_path = path_str(counter)?;
504 ctx.write(counter_path, Value::Number(serde_json::Number::from(0u32)))?;
505 let mut n: u32 = 0;
506 loop {
507 if n >= *max {
508 break;
509 }
510 let snap = ctx.snapshot();
511 if !is_truthy(&eval_expr_with_externs(cond, &snap, externs)?) {
512 break;
513 }
514 eval_with_storage_externs(body, ctx, dispatcher, externs)?;
515 n += 1;
516 ctx.write(counter_path, Value::Number(serde_json::Number::from(n)))?;
517 }
518 Ok(())
519 }
520 Node::Try {
521 body,
522 catch,
523 err_at,
524 } => {
525 // body 失敗時の rollback 用 snapshot
526 let snap_before = ctx.snapshot();
527 match eval_with_storage_externs(body, ctx, dispatcher, externs) {
528 Ok(()) => Ok(()),
529 Err(e) => {
530 // body の途中 write を破棄 (Try semantic: rollback)
531 ctx.replace(snap_before);
532 if let Some(at) = err_at {
533 ctx.write(path_str(at)?, Value::String(e.to_string()))?;
534 }
535 eval_with_storage_externs(catch, ctx, dispatcher, externs)
536 }
537 }
538 }
539 Node::Assign { at, value } => {
540 let snap = ctx.snapshot();
541 let v = eval_expr_with_externs(value, &snap, externs)?;
542 ctx.write(path_str(at)?, v)
543 }
544 }
545}
546
547/// Internal: fanout per-item sync evaluator (disjoint branch ctx).
548fn fanout_eval_sync<D: Dispatcher>(
549 bind: &Expr,
550 body: &Node,
551 join: JoinMode,
552 base_snap: &Value,
553 items_arr: Vec<Value>,
554 dispatcher: &D,
555 externs: &dyn Externs,
556) -> Result<Value, EvalError> {
557 match join {
558 JoinMode::All => {
559 let mut results = Vec::with_capacity(items_arr.len());
560 for item in items_arr {
561 let branch_ctx = write_path(bind, base_snap.clone(), item)?;
562 let storage = MemoryCtx::new(branch_ctx);
563 eval_with_storage_externs(body, &storage, dispatcher, externs)?;
564 results.push(storage.snapshot());
565 }
566 Ok(Value::Array(results))
567 }
568 JoinMode::Any => {
569 let mut winner: Option<Value> = None;
570 let mut last_err: Option<EvalError> = None;
571 for item in items_arr {
572 let branch_ctx = write_path(bind, base_snap.clone(), item)?;
573 let storage = MemoryCtx::new(branch_ctx);
574 match eval_with_storage_externs(body, &storage, dispatcher, externs) {
575 Ok(()) => {
576 winner = Some(storage.snapshot());
577 last_err = None;
578 break;
579 }
580 Err(e) => last_err = Some(e),
581 }
582 }
583 if let Some(e) = last_err {
584 return Err(e);
585 }
586 Ok(winner.unwrap_or(Value::Array(vec![])))
587 }
588 JoinMode::Race => {
589 if let Some(first) = items_arr.into_iter().next() {
590 let branch_ctx = write_path(bind, base_snap.clone(), first)?;
591 let storage = MemoryCtx::new(branch_ctx);
592 eval_with_storage_externs(body, &storage, dispatcher, externs)?;
593 Ok(storage.snapshot())
594 } else {
595 Ok(Value::Array(vec![]))
596 }
597 }
598 JoinMode::AllSettled => {
599 let mut records = Vec::with_capacity(items_arr.len());
600 for item in items_arr {
601 let branch_ctx = write_path(bind, base_snap.clone(), item)?;
602 let storage = MemoryCtx::new(branch_ctx);
603 match eval_with_storage_externs(body, &storage, dispatcher, externs) {
604 Ok(()) => records.push(
605 serde_json::json!({"status": "fulfilled", "value": storage.snapshot()}),
606 ),
607 Err(e) => records
608 .push(serde_json::json!({"status": "rejected", "reason": e.to_string()})),
609 }
610 }
611 Ok(Value::Array(records))
612 }
613 }
614}
615
616/// Legacy Value-passing sync evaluator — backward compat wrapper around
617/// `eval_with_storage` + `MemoryCtx`. `Value` を所有権で受け取り、 内部で
618/// `MemoryCtx::new(ctx)` を使って storage 版に委譲、 終了後の snapshot を返す。
619///
620/// 既存 caller (= dynamic injection を要求しない、 1-shot pure eval 用途) は
621/// 引き続きこの API で OK。 動的注入が要る場合は `eval_with_storage` を直接
622/// 呼ぶ。
623///
624/// Returns the updated context (= ctx with `Step.out` path written for each step traversed).
625pub fn eval<D: Dispatcher>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError> {
626 eval_externs(node, ctx, dispatcher, &NoExterns)
627}
628
629/// `eval` + externs registry for `call_extern` Expr resolution.
630pub fn eval_externs<D: Dispatcher>(
631 node: &Node,
632 ctx: Value,
633 dispatcher: &D,
634 externs: &dyn Externs,
635) -> Result<Value, EvalError> {
636 let storage = MemoryCtx::new(ctx);
637 eval_with_storage_externs(node, &storage, dispatcher, externs)?;
638 Ok(storage.snapshot())
639}
640
641/// JSON value の truthy 判定 (= flow.ir Branch cond / Loop cond で使う)。
642/// Bool は値そのまま、 null/false 以外は truthy (Lua / JS と整合)。
643pub fn is_truthy(v: &Value) -> bool {
644 match v {
645 Value::Null => false,
646 Value::Bool(b) => *b,
647 _ => true,
648 }
649}
650
651/// Evaluate an `Expr` against a context value, returning the resolved JSON
652/// value. Externs-less compat wrapper — `call_extern` raises `ExternError`.
653pub fn eval_expr(expr: &Expr, ctx: &Value) -> Result<Value, EvalError> {
654 eval_expr_with_externs(expr, ctx, &NoExterns)
655}
656
657/// `eval_expr` + externs registry for `call_extern` Expr resolution.
658pub fn eval_expr_with_externs(
659 expr: &Expr,
660 ctx: &Value,
661 externs: &dyn Externs,
662) -> Result<Value, EvalError> {
663 let ev = |e: &Expr| eval_expr_with_externs(e, ctx, externs);
664 match expr {
665 Expr::Lit { value } => Ok(value.clone()),
666 Expr::Path { at } => read_path(at, ctx),
667 Expr::Eq { lhs, rhs } => Ok(Value::Bool(ev(lhs)? == ev(rhs)?)),
668 Expr::Ne { lhs, rhs } => Ok(Value::Bool(ev(lhs)? != ev(rhs)?)),
669 Expr::Lt { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_lt()),
670 Expr::Lte { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_le()),
671 Expr::Gt { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_gt()),
672 Expr::Gte { lhs, rhs } => ord_cmp(&ev(lhs)?, &ev(rhs)?, |o| o.is_ge()),
673 Expr::Not { arg } => Ok(Value::Bool(!is_truthy(&ev(arg)?))),
674 Expr::And { args } => {
675 for a in args {
676 if !is_truthy(&ev(a)?) {
677 return Ok(Value::Bool(false));
678 }
679 }
680 Ok(Value::Bool(true))
681 }
682 Expr::Or { args } => {
683 for a in args {
684 if is_truthy(&ev(a)?) {
685 return Ok(Value::Bool(true));
686 }
687 }
688 Ok(Value::Bool(false))
689 }
690 Expr::Exists { arg } => match ev(arg) {
691 Ok(Value::Null) => Ok(Value::Bool(false)),
692 Ok(_) => Ok(Value::Bool(true)),
693 // canonical: a path to a missing key reads as nil → exists=false
694 Err(EvalError::PathNotFound(_)) => Ok(Value::Bool(false)),
695 Err(e) => Err(e),
696 },
697 Expr::Add { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "add", |a, b| Some(a + b)),
698 Expr::Sub { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "sub", |a, b| Some(a - b)),
699 Expr::Mul { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "mul", |a, b| Some(a * b)),
700 Expr::Div { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "div", |a, b| {
701 if b == 0.0 {
702 None
703 } else {
704 Some(a / b)
705 }
706 }),
707 // Lua `%` semantics (canonical): a - floor(a/b)*b, sign follows rhs.
708 Expr::Mod { lhs, rhs } => num_arith(&ev(lhs)?, &ev(rhs)?, "mod", |a, b| {
709 if b == 0.0 {
710 None
711 } else {
712 Some(a - (a / b).floor() * b)
713 }
714 }),
715 Expr::Len { arg } => {
716 let v = ev(arg)?;
717 let n = match &v {
718 Value::Array(a) => a.len(),
719 Value::String(s) => s.chars().count(),
720 Value::Object(o) => o.len(),
721 other => {
722 return Err(EvalError::DispatcherError {
723 ref_: "expr.len".into(),
724 msg: format!("len: unsupported type {other:?}"),
725 })
726 }
727 };
728 Ok(Value::Number(serde_json::Number::from(n as u64)))
729 }
730 Expr::In { needle, haystack } => {
731 let n = ev(needle)?;
732 let h = ev(haystack)?;
733 match h {
734 Value::Array(a) => Ok(Value::Bool(a.iter().any(|e| e == &n))),
735 other => Err(EvalError::DispatcherError {
736 ref_: "expr.in".into(),
737 msg: format!("in: haystack must be array, got {other:?}"),
738 }),
739 }
740 }
741 Expr::CallExtern { ref_, args } => {
742 let mut vals = Vec::with_capacity(args.len());
743 for a in args {
744 vals.push(ev(a)?);
745 }
746 externs.call(ref_, &vals)
747 }
748 }
749}
750
751/// Coerce a JSON value to f64 for numeric ops. Bool / null / non-number raise.
752fn to_f64(v: &Value, op: &str) -> Result<f64, EvalError> {
753 match v {
754 Value::Number(n) => n.as_f64().ok_or_else(|| EvalError::DispatcherError {
755 ref_: format!("expr.{op}"),
756 msg: format!("non-f64-representable number: {n}"),
757 }),
758 other => Err(EvalError::DispatcherError {
759 ref_: format!("expr.{op}"),
760 msg: format!("expected number, got {other:?}"),
761 }),
762 }
763}
764
765/// Ordering comparison over two evaluated values. Mirrors canonical Lua
766/// `< / <= / > / >=`: both numbers (f64) or both strings (lexicographic
767/// byte order, same as Lua's string comparison for UTF-8); anything else
768/// raises.
769fn ord_cmp<F>(lv: &Value, rv: &Value, f: F) -> Result<Value, EvalError>
770where
771 F: Fn(std::cmp::Ordering) -> bool,
772{
773 let ord = match (lv, rv) {
774 (Value::Number(_), Value::Number(_)) => {
775 let l = to_f64(lv, "cmp")?;
776 let r = to_f64(rv, "cmp")?;
777 l.partial_cmp(&r)
778 .ok_or_else(|| EvalError::DispatcherError {
779 ref_: "expr.cmp".into(),
780 msg: "non-comparable numbers (NaN)".into(),
781 })?
782 }
783 (Value::String(l), Value::String(r)) => l.cmp(r),
784 (l, r) => {
785 return Err(EvalError::DispatcherError {
786 ref_: "expr.cmp".into(),
787 msg: format!("cmp: both sides must be numbers or strings, got {l:?} vs {r:?}"),
788 })
789 }
790 };
791 Ok(Value::Bool(f(ord)))
792}
793
794fn num_arith<F>(lv: &Value, rv: &Value, op: &str, f: F) -> Result<Value, EvalError>
795where
796 F: Fn(f64, f64) -> Option<f64>,
797{
798 let l = to_f64(lv, op)?;
799 let r = to_f64(rv, op)?;
800 let result = f(l, r).ok_or_else(|| EvalError::DispatcherError {
801 ref_: format!("expr.{op}"),
802 msg: "arithmetic failure (e.g. division by zero)".into(),
803 })?;
804 let n = serde_json::Number::from_f64(result).ok_or_else(|| EvalError::DispatcherError {
805 ref_: format!("expr.{op}"),
806 msg: format!("result not f64-representable: {result}"),
807 })?;
808 Ok(Value::Number(n))
809}
810
811// ──────────────────────────────────────────────────────────────────────────
812// Path helpers (simple `$.a.b.c` form, no array index in MVP)
813// ──────────────────────────────────────────────────────────────────────────
814
815/// Read a path from a JSON value. Supports simple `$.a.b.c` form.
816pub fn read_path(path: &str, ctx: &Value) -> Result<Value, EvalError> {
817 let trimmed = strip_path_prefix(path)?;
818 if trimmed.is_empty() {
819 return Ok(ctx.clone());
820 }
821 let mut cur = ctx;
822 for key in trimmed.split('.') {
823 cur = cur
824 .get(key)
825 .ok_or_else(|| EvalError::PathNotFound(path.to_string()))?;
826 }
827 Ok(cur.clone())
828}
829
830/// Write a value at the path location inside ctx, returning the updated ctx.
831/// `out` must be a `Path` Expr.
832pub fn write_path(out: &Expr, ctx: Value, value: Value) -> Result<Value, EvalError> {
833 let path = match out {
834 Expr::Path { at } => at,
835 _ => {
836 return Err(EvalError::InvalidPath(
837 "Step.out must be a Path expr".into(),
838 ))
839 }
840 };
841 let trimmed = strip_path_prefix(path)?;
842 let keys: Vec<&str> = trimmed.split('.').filter(|s| !s.is_empty()).collect();
843 if keys.is_empty() {
844 return Ok(value);
845 }
846 let mut root = ctx;
847 write_path_recursive(&mut root, &keys, value);
848 Ok(root)
849}
850
851fn strip_path_prefix(path: &str) -> Result<&str, EvalError> {
852 path.strip_prefix("$.")
853 .or_else(|| path.strip_prefix('$'))
854 .ok_or_else(|| EvalError::InvalidPath(format!("path must start with $ or $.: {}", path)))
855}
856
857fn write_path_recursive(node: &mut Value, keys: &[&str], value: Value) {
858 if keys.is_empty() {
859 *node = value;
860 return;
861 }
862 if !node.is_object() {
863 *node = Value::Object(serde_json::Map::new());
864 }
865 let obj = node.as_object_mut().expect("just initialised as object");
866 let key = keys[0];
867 if keys.len() == 1 {
868 obj.insert(key.to_string(), value);
869 } else {
870 let entry = obj
871 .entry(key.to_string())
872 .or_insert(Value::Object(serde_json::Map::new()));
873 write_path_recursive(entry, &keys[1..], value);
874 }
875}