Skip to main content

mlua_flow_ir/
lib.rs

1#![deny(unsafe_code)]
2//! flow.ir async runtime + mlua binding.
3//!
4//! Layer 3 of the 4-layer flow.ir stack:
5//!
6//! 1. `flow-ir-lua` — Pure Lua DSL (separate repo, ecosystem-neutral)
7//! 2. `flow-ir-core` — Pure Rust schema + sync interpreter (no mlua, no async)
8//! 3. `mlua-flow-ir` — **this crate**: re-export of `flow-ir-core` +
9//!    `AsyncDispatcher` + `eval_async` + `fanout_eval` + Lua `module()` binding
10//! 4. `mlua-swarm-engine` — host concerns (Spawner / Worker / Loop /
11//!    AuthzPolicy / cp_state persist)
12//!
13//! All schema types (`Node` / `Expr` / `JoinMode` / `EvalError` / `Dispatcher`)
14//! are re-exported verbatim from `flow-ir-core` so callers can keep a single
15//! import path:
16//!
17//! ```
18//! use mlua_flow_ir::{eval, eval_async, AsyncDispatcher, Dispatcher, EvalError, Expr, Node};
19//! ```
20
21// ──────────────────────────────────────────────────────────────────────────
22// Re-export Pure Rust core (flow-ir-core)
23// ──────────────────────────────────────────────────────────────────────────
24
25pub use flow_ir_core::{
26    eval, eval_expr, eval_expr_with_externs, eval_externs, eval_with_storage,
27    eval_with_storage_externs, is_truthy, read_path, write_path, CtxStorage, Dispatcher, EvalError,
28    Expr, ExternFn, ExternMap, Externs, JoinMode, MemoryCtx, NoExterns, Node,
29};
30
31use serde_json::Value;
32use std::sync::Arc;
33
34// ══════════════════════════════════════════════════════════════════════════
35// v0.0.2 — Async core (eval_async + AsyncDispatcher trait)
36// ══════════════════════════════════════════════════════════════════════════
37
38use async_recursion::async_recursion;
39use async_trait::async_trait;
40
41/// Async dispatcher trait — async 版 `Dispatcher`。
42///
43/// `async_trait` macro 経由 (= Rust 2021 互換 + dyn safe)。 Host crate
44/// (e.g. mlua-swarm-engine `AsyncSpawner`) が impl する。 substrate には
45/// tokio dep 入れない (= Pure 維持)、 executor は caller (host) 責務。
46#[async_trait]
47pub trait AsyncDispatcher: Send + Sync {
48    async fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
49}
50
51/// Evaluate a `Node` against a context value asynchronously,
52/// using the given async dispatcher for `Step` resolution.
53///
54/// `eval` (sync) と同型 logic、 dispatch を `.await` に置き換え。 Seq / Branch
55/// は recursive async fn (= `async_recursion` macro で `Pin<Box>` wrap)。
56///
57/// # Quick start
58///
59/// ```
60/// use async_trait::async_trait;
61/// use mlua_flow_ir::{eval_async, AsyncDispatcher, EvalError, Expr, Node};
62/// use serde_json::{json, Value};
63///
64/// struct Fixture;
65///
66/// #[async_trait]
67/// impl AsyncDispatcher for Fixture {
68///     async fn dispatch(&self, _r: &str, input: Value) -> Result<Value, EvalError> {
69///         if let Value::String(s) = input {
70///             Ok(Value::String(s.to_uppercase()))
71///         } else {
72///             Ok(input)
73///         }
74///     }
75/// }
76///
77/// let rt = tokio::runtime::Runtime::new().unwrap();
78/// rt.block_on(async {
79///     let node = Node::Step {
80///         ref_: "up".into(),
81///         in_: Expr::Path { at: "$.input".into() },
82///         out: Expr::Path { at: "$.output".into() },
83///     };
84///     let out = eval_async(&node, json!({ "input": "hello" }), &Fixture).await.unwrap();
85///     assert_eq!(out, json!({ "input": "hello", "output": "HELLO" }));
86/// });
87/// ```
88/// Storage-backed async evaluator — canonical entry.
89///
90/// `Arc<dyn CtxStorage>` 経由で ctx を共有することで、 dispatch().await suspend
91/// 中に外部 task が同じ ctx に `write` できる (= dynamic State injection 経路)。
92/// Step 評価の境界で `ctx.snapshot()` を取って Expr eval に渡す。
93pub async fn eval_async_with_storage<D>(
94    node: &Node,
95    ctx: Arc<dyn CtxStorage>,
96    dispatcher: &D,
97) -> Result<(), EvalError>
98where
99    D: AsyncDispatcher + ?Sized,
100{
101    eval_async_with_storage_externs(node, ctx, dispatcher, &NoExterns).await
102}
103
104/// `eval_async_with_storage` + externs registry for `call_extern` Expr
105/// resolution. `externs` must be `Sync` so the recursive future stays `Send`
106/// (host executors spawn it across threads).
107#[async_recursion]
108pub async fn eval_async_with_storage_externs<D>(
109    node: &Node,
110    ctx: Arc<dyn CtxStorage>,
111    dispatcher: &D,
112    externs: &(dyn Externs + Sync),
113) -> Result<(), EvalError>
114where
115    D: AsyncDispatcher + ?Sized,
116{
117    match node {
118        Node::Step { ref_, in_, out } => {
119            // snap は dispatch() **呼出し前** の view。 dispatch().await 中に
120            // 外部 task が ctx.write しても、 ここで取った snap は影響を受けず
121            // input の値は確定。 write_target の `out` path への write は
122            // dispatch 完了後に共有 ctx を直接更新。
123            let snap = ctx.snapshot();
124            let input = eval_expr_with_externs(in_, &snap, externs)?;
125            let output =
126                dispatcher
127                    .dispatch(ref_, input)
128                    .await
129                    .map_err(|e| EvalError::DispatcherError {
130                        ref_: ref_.clone(),
131                        msg: e.to_string(),
132                    })?;
133            ctx.write(path_str_async(out)?, output)
134        }
135        Node::Seq { children } => {
136            for child in children {
137                eval_async_with_storage_externs(child, ctx.clone(), dispatcher, externs).await?;
138            }
139            Ok(())
140        }
141        Node::Branch { cond, then_, else_ } => {
142            let snap = ctx.snapshot();
143            match eval_expr_with_externs(cond, &snap, externs)? {
144                Value::Bool(true) => {
145                    eval_async_with_storage_externs(then_, ctx, dispatcher, externs).await
146                }
147                Value::Bool(false) => {
148                    eval_async_with_storage_externs(else_, ctx, dispatcher, externs).await
149                }
150                other => Err(EvalError::NonBoolCond(other)),
151            }
152        }
153        Node::Fanout {
154            items,
155            bind,
156            body,
157            join,
158            out,
159        } => fanout_eval(items, bind, body, *join, out, ctx, dispatcher, externs).await,
160        Node::Loop {
161            counter,
162            cond,
163            body,
164            max,
165        } => {
166            let counter_path = path_str_async(counter)?.to_string();
167            ctx.write(&counter_path, Value::Number(serde_json::Number::from(0u32)))?;
168            let mut n: u32 = 0;
169            loop {
170                if n >= *max {
171                    break;
172                }
173                let snap = ctx.snapshot();
174                if !is_truthy(&eval_expr_with_externs(cond, &snap, externs)?) {
175                    break;
176                }
177                eval_async_with_storage_externs(body, ctx.clone(), dispatcher, externs).await?;
178                n += 1;
179                ctx.write(&counter_path, Value::Number(serde_json::Number::from(n)))?;
180            }
181            Ok(())
182        }
183        Node::Try {
184            body,
185            catch,
186            err_at,
187        } => {
188            let snap_before = ctx.snapshot();
189            match eval_async_with_storage_externs(body, ctx.clone(), dispatcher, externs).await {
190                Ok(()) => Ok(()),
191                Err(e) => {
192                    ctx.replace(snap_before);
193                    if let Some(at) = err_at {
194                        ctx.write(path_str_async(at)?, Value::String(e.to_string()))?;
195                    }
196                    eval_async_with_storage_externs(catch, ctx, dispatcher, externs).await
197                }
198            }
199        }
200        Node::Assign { at, value } => {
201            let snap = ctx.snapshot();
202            let v = eval_expr_with_externs(value, &snap, externs)?;
203            ctx.write(path_str_async(at)?, v)
204        }
205    }
206}
207
208/// Legacy Value-passing async evaluator — backward compat wrapper around
209/// `eval_async_with_storage` + `MemoryCtx`. 既存 caller (= dynamic injection
210/// を要求しない用途) は引き続きこの API で OK。
211pub async fn eval_async<D>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError>
212where
213    D: AsyncDispatcher + ?Sized,
214{
215    eval_async_externs(node, ctx, dispatcher, &NoExterns).await
216}
217
218/// `eval_async` + externs registry for `call_extern` Expr resolution.
219pub async fn eval_async_externs<D>(
220    node: &Node,
221    ctx: Value,
222    dispatcher: &D,
223    externs: &(dyn Externs + Sync),
224) -> Result<Value, EvalError>
225where
226    D: AsyncDispatcher + ?Sized,
227{
228    let storage: Arc<dyn CtxStorage> = MemoryCtx::shared(ctx);
229    eval_async_with_storage_externs(node, storage.clone(), dispatcher, externs).await?;
230    Ok(storage.snapshot())
231}
232
233/// Resolve `Path` Expr to its literal `$.a.b.c` string (async eval 側 helper).
234fn path_str_async(expr: &Expr) -> Result<&str, EvalError> {
235    match expr {
236        Expr::Path { at } => Ok(at.as_str()),
237        _ => Err(EvalError::InvalidPath(
238            "expected Path expr for write target".into(),
239        )),
240    }
241}
242
243/// Fanout 並列 evaluator (storage-backed)。 各 branch は disjoint MemoryCtx
244/// を持ち、 branch 内で write しても共有 ctx には影響しない (= snapshot 切り出し
245/// semantic)。 集約結果は最後に共有 ctx の `out` path に write。
246#[async_recursion]
247#[allow(clippy::too_many_arguments)]
248async fn fanout_eval<D>(
249    items: &Expr,
250    bind: &Expr,
251    body: &Node,
252    join: JoinMode,
253    out: &Expr,
254    ctx: Arc<dyn CtxStorage>,
255    dispatcher: &D,
256    externs: &(dyn Externs + Sync),
257) -> Result<(), EvalError>
258where
259    D: AsyncDispatcher + ?Sized,
260{
261    use futures::future::{join_all, select_ok, FutureExt};
262
263    let snap = ctx.snapshot();
264    let items_val = eval_expr_with_externs(items, &snap, externs)?;
265    let items_arr = match items_val {
266        Value::Array(a) => a,
267        other => {
268            return Err(EvalError::DispatcherError {
269                ref_: "fanout.items".into(),
270                msg: format!("expected array, got {other:?}"),
271            })
272        }
273    };
274
275    // branch storage を pre-allocate して、 各 branch future と pair で持つ。
276    // 集約時に同じ storage の snapshot を取って結果にする。
277    let branches: Vec<Arc<dyn CtxStorage>> = items_arr
278        .into_iter()
279        .map(|item| -> Result<Arc<dyn CtxStorage>, EvalError> {
280            let branch_ctx = write_path(bind, snap.clone(), item)?;
281            Ok(MemoryCtx::shared(branch_ctx))
282        })
283        .collect::<Result<_, _>>()?;
284
285    // 各 branch を `(idx, future)` で wrap。 future は branch storage と body を
286    // 共有して走る。
287    let branch_futs: Vec<_> = branches
288        .iter()
289        .map(|b| eval_async_with_storage_externs(body, b.clone(), dispatcher, externs))
290        .collect();
291
292    let joined: Value = match join {
293        JoinMode::All => {
294            futures::future::try_join_all(branch_futs).await?;
295            Value::Array(branches.iter().map(|b| b.snapshot()).collect())
296        }
297        JoinMode::Any => {
298            if branch_futs.is_empty() {
299                Value::Array(vec![])
300            } else {
301                let mapped: Vec<_> = branch_futs
302                    .into_iter()
303                    .enumerate()
304                    .map(|(i, f)| f.map(move |r| r.map(|()| i)).boxed())
305                    .collect();
306                let (winner_idx, _rest) = select_ok(mapped).await?;
307                branches[winner_idx].snapshot()
308            }
309        }
310        JoinMode::Race => {
311            if branch_futs.is_empty() {
312                Value::Array(vec![])
313            } else {
314                let mapped: Vec<_> = branch_futs
315                    .into_iter()
316                    .enumerate()
317                    .map(|(i, f)| f.map(move |r| r.map(|()| i)).boxed())
318                    .collect();
319                let (first, _idx, _rest) = futures::future::select_all(mapped).await;
320                let winner_idx = first?;
321                branches[winner_idx].snapshot()
322            }
323        }
324        JoinMode::AllSettled => {
325            let results = join_all(branch_futs).await;
326            let records: Vec<Value> = results
327                .into_iter()
328                .zip(branches.iter())
329                .map(|(r, b)| match r {
330                    Ok(()) => serde_json::json!({"status": "fulfilled", "value": b.snapshot()}),
331                    Err(e) => serde_json::json!({"status": "rejected", "reason": e.to_string()}),
332                })
333                .collect();
334            Value::Array(records)
335        }
336    };
337
338    ctx.write(path_str_async(out)?, joined)
339}
340
341// ══════════════════════════════════════════════════════════════════════════
342// v0.0.3 — mlua bridge full
343// ══════════════════════════════════════════════════════════════════════════
344
345use mlua::LuaSerdeExt;
346
347/// Lua function を Rust `Dispatcher` trait に wrap した adapter。
348///
349/// Lua 側 dispatcher function `function(ref, input) return ... end` を受けて、
350/// Rust `eval(node, ctx, &lua_dispatcher)` から呼び出せるようにする。
351/// 内部で serde Value ↔ Lua value 変換 (= mlua serde feature) を経由。
352struct LuaDispatcher<'a> {
353    lua: &'a mlua::Lua,
354    func: mlua::Function,
355}
356
357impl<'a> Dispatcher for LuaDispatcher<'a> {
358    fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
359        let lua_input = self
360            .lua
361            .to_value(&input)
362            .map_err(|e| EvalError::DispatcherError {
363                ref_: ref_.into(),
364                msg: format!("to_value: {}", e),
365            })?;
366        let result: mlua::Value = self.func.call((ref_.to_string(), lua_input)).map_err(|e| {
367            EvalError::DispatcherError {
368                ref_: ref_.into(),
369                msg: format!("lua call: {}", e),
370            }
371        })?;
372        let value: Value = self
373            .lua
374            .from_value(result)
375            .map_err(|e| EvalError::DispatcherError {
376                ref_: ref_.into(),
377                msg: format!("from_value: {}", e),
378            })?;
379        Ok(value)
380    }
381}
382
383/// Lua function table を Rust `Externs` trait に wrap した adapter。
384///
385/// canonical `opts.externs` (flow-ir-lua) と同型: table の各 entry は
386/// pure Lua function で、 `call_extern` Expr の ref で引かれ、 評価済み args
387/// を positional に受けて値を返す。 これが「LuaScript 直実行 Hatch」の
388/// Rust 側の受け口 (extern の実体は任意の Lua closure)。
389struct LuaExterns<'a> {
390    lua: &'a mlua::Lua,
391    table: mlua::Table,
392}
393
394impl<'a> flow_ir_core::Externs for LuaExterns<'a> {
395    fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError> {
396        let func: mlua::Function = self.table.get(ref_).map_err(|_| EvalError::ExternError {
397            ref_: ref_.into(),
398            msg: "not registered in externs (or not a function)".into(),
399        })?;
400        let mut lua_args = mlua::MultiValue::new();
401        for a in args {
402            lua_args.push_back(self.lua.to_value(a).map_err(|e| EvalError::ExternError {
403                ref_: ref_.into(),
404                msg: format!("to_value: {}", e),
405            })?);
406        }
407        let result: mlua::Value = func.call(lua_args).map_err(|e| EvalError::ExternError {
408            ref_: ref_.into(),
409            msg: format!("lua call: {}", e),
410        })?;
411        self.lua
412            .from_value(result)
413            .map_err(|e| EvalError::ExternError {
414                ref_: ref_.into(),
415                msg: format!("from_value: {}", e),
416            })
417    }
418}
419
420/// Register the flow module table with Lua.
421///
422/// Exposes:
423///
424/// - `flow.version` (= string): crate version
425/// - `flow.eval(node_table, ctx_table, dispatcher_fn, externs_table?) ->
426///   result_table`: Lua-side entry to evaluate a flow.ir BluePrint with a
427///   Lua dispatcher fn. Optional 4th arg is a table of pure Lua functions
428///   resolved by `call_extern` Expr (canonical `opts.externs` parity).
429///
430/// # Lua usage
431///
432/// ```lua
433/// local flow = require("flow")  -- or set via lua.globals():set("flow", module(lua))
434///
435/// local node = {
436///   kind = "step",
437///   ref = "uppercase",
438///   ["in"] = { op = "path", at = "$.input" },
439///   out = { op = "path", at = "$.output" },
440/// }
441///
442/// local function dispatcher(ref, input)
443///   if ref == "uppercase" then
444///     return string.upper(input)
445///   end
446/// end
447///
448/// local result = flow.eval(node, { input = "hello" }, dispatcher)
449/// assert(result.output == "HELLO")
450///
451/// -- call_extern: whitelist pure Lua fns via the externs table
452/// local node2 = {
453///   kind = "assign",
454///   at = { op = "path", at = "$.root" },
455///   value = { op = "call_extern", ref = "math.sqrt",
456///             args = { { op = "path", at = "$.n" } } },
457/// }
458/// local result2 = flow.eval(node2, { n = 9 }, dispatcher,
459///                           { ["math.sqrt"] = math.sqrt })
460/// assert(result2.root == 3)
461/// ```
462pub fn module(lua: &mlua::Lua) -> mlua::Result<mlua::Table> {
463    let t = lua.create_table()?;
464    t.set("version", env!("CARGO_PKG_VERSION"))?;
465
466    let eval_fn = lua.create_function(
467        |lua_inner: &mlua::Lua,
468         (node_val, ctx_val, dispatcher_fn, externs_val): (
469            mlua::Value,
470            mlua::Value,
471            mlua::Function,
472            Option<mlua::Table>,
473        )| {
474            let node: Node = lua_inner
475                .from_value(node_val)
476                .map_err(|e| mlua::Error::external(format!("node parse: {}", e)))?;
477            let ctx: Value = lua_inner
478                .from_value(ctx_val)
479                .map_err(|e| mlua::Error::external(format!("ctx parse: {}", e)))?;
480
481            let dispatcher = LuaDispatcher {
482                lua: lua_inner,
483                func: dispatcher_fn,
484            };
485            let result = match externs_val {
486                Some(table) => {
487                    let externs = LuaExterns {
488                        lua: lua_inner,
489                        table,
490                    };
491                    eval_externs(&node, ctx, &dispatcher, &externs)
492                }
493                None => eval(&node, ctx, &dispatcher),
494            }
495            .map_err(|e| mlua::Error::external(format!("eval: {}", e)))?;
496            lua_inner.to_value(&result)
497        },
498    )?;
499    t.set("eval", eval_fn)?;
500
501    Ok(t)
502}