1#![deny(unsafe_code)]
2pub use flow_ir_core::{
26 eval, eval_expr, eval_expr_with_externs, eval_externs, eval_with_storage,
27 eval_with_storage_externs, is_truthy, read_path, write_path, CtxStorage, Dispatcher, EvalError,
28 Expr, ExternFn, ExternMap, Externs, JoinMode, MemoryCtx, NoExterns, Node,
29};
30
31use serde_json::Value;
32use std::sync::Arc;
33
34use async_recursion::async_recursion;
39use async_trait::async_trait;
40
41#[async_trait]
47pub trait AsyncDispatcher: Send + Sync {
48 async fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError>;
49}
50
51pub async fn eval_async_with_storage<D>(
94 node: &Node,
95 ctx: Arc<dyn CtxStorage>,
96 dispatcher: &D,
97) -> Result<(), EvalError>
98where
99 D: AsyncDispatcher + ?Sized,
100{
101 eval_async_with_storage_externs(node, ctx, dispatcher, &NoExterns).await
102}
103
104#[async_recursion]
108pub async fn eval_async_with_storage_externs<D>(
109 node: &Node,
110 ctx: Arc<dyn CtxStorage>,
111 dispatcher: &D,
112 externs: &(dyn Externs + Sync),
113) -> Result<(), EvalError>
114where
115 D: AsyncDispatcher + ?Sized,
116{
117 match node {
118 Node::Step { ref_, in_, out } => {
119 let snap = ctx.snapshot();
124 let input = eval_expr_with_externs(in_, &snap, externs)?;
125 let output =
126 dispatcher
127 .dispatch(ref_, input)
128 .await
129 .map_err(|e| EvalError::DispatcherError {
130 ref_: ref_.clone(),
131 msg: e.to_string(),
132 })?;
133 ctx.write(path_str_async(out)?, output)
134 }
135 Node::Seq { children } => {
136 for child in children {
137 eval_async_with_storage_externs(child, ctx.clone(), dispatcher, externs).await?;
138 }
139 Ok(())
140 }
141 Node::Branch { cond, then_, else_ } => {
142 let snap = ctx.snapshot();
143 match eval_expr_with_externs(cond, &snap, externs)? {
144 Value::Bool(true) => {
145 eval_async_with_storage_externs(then_, ctx, dispatcher, externs).await
146 }
147 Value::Bool(false) => {
148 eval_async_with_storage_externs(else_, ctx, dispatcher, externs).await
149 }
150 other => Err(EvalError::NonBoolCond(other)),
151 }
152 }
153 Node::Fanout {
154 items,
155 bind,
156 body,
157 join,
158 out,
159 } => fanout_eval(items, bind, body, *join, out, ctx, dispatcher, externs).await,
160 Node::Loop {
161 counter,
162 cond,
163 body,
164 max,
165 } => {
166 let counter_path = path_str_async(counter)?.to_string();
167 ctx.write(&counter_path, Value::Number(serde_json::Number::from(0u32)))?;
168 let mut n: u32 = 0;
169 loop {
170 if n >= *max {
171 break;
172 }
173 let snap = ctx.snapshot();
174 if !is_truthy(&eval_expr_with_externs(cond, &snap, externs)?) {
175 break;
176 }
177 eval_async_with_storage_externs(body, ctx.clone(), dispatcher, externs).await?;
178 n += 1;
179 ctx.write(&counter_path, Value::Number(serde_json::Number::from(n)))?;
180 }
181 Ok(())
182 }
183 Node::Try {
184 body,
185 catch,
186 err_at,
187 } => {
188 let snap_before = ctx.snapshot();
189 match eval_async_with_storage_externs(body, ctx.clone(), dispatcher, externs).await {
190 Ok(()) => Ok(()),
191 Err(e) => {
192 ctx.replace(snap_before);
193 if let Some(at) = err_at {
194 ctx.write(path_str_async(at)?, Value::String(e.to_string()))?;
195 }
196 eval_async_with_storage_externs(catch, ctx, dispatcher, externs).await
197 }
198 }
199 }
200 Node::Assign { at, value } => {
201 let snap = ctx.snapshot();
202 let v = eval_expr_with_externs(value, &snap, externs)?;
203 ctx.write(path_str_async(at)?, v)
204 }
205 }
206}
207
208pub async fn eval_async<D>(node: &Node, ctx: Value, dispatcher: &D) -> Result<Value, EvalError>
212where
213 D: AsyncDispatcher + ?Sized,
214{
215 eval_async_externs(node, ctx, dispatcher, &NoExterns).await
216}
217
218pub async fn eval_async_externs<D>(
220 node: &Node,
221 ctx: Value,
222 dispatcher: &D,
223 externs: &(dyn Externs + Sync),
224) -> Result<Value, EvalError>
225where
226 D: AsyncDispatcher + ?Sized,
227{
228 let storage: Arc<dyn CtxStorage> = MemoryCtx::shared(ctx);
229 eval_async_with_storage_externs(node, storage.clone(), dispatcher, externs).await?;
230 Ok(storage.snapshot())
231}
232
233fn path_str_async(expr: &Expr) -> Result<&str, EvalError> {
235 match expr {
236 Expr::Path { at } => Ok(at.as_str()),
237 _ => Err(EvalError::InvalidPath(
238 "expected Path expr for write target".into(),
239 )),
240 }
241}
242
243#[async_recursion]
247#[allow(clippy::too_many_arguments)]
248async fn fanout_eval<D>(
249 items: &Expr,
250 bind: &Expr,
251 body: &Node,
252 join: JoinMode,
253 out: &Expr,
254 ctx: Arc<dyn CtxStorage>,
255 dispatcher: &D,
256 externs: &(dyn Externs + Sync),
257) -> Result<(), EvalError>
258where
259 D: AsyncDispatcher + ?Sized,
260{
261 use futures::future::{join_all, select_ok, FutureExt};
262
263 let snap = ctx.snapshot();
264 let items_val = eval_expr_with_externs(items, &snap, externs)?;
265 let items_arr = match items_val {
266 Value::Array(a) => a,
267 other => {
268 return Err(EvalError::DispatcherError {
269 ref_: "fanout.items".into(),
270 msg: format!("expected array, got {other:?}"),
271 })
272 }
273 };
274
275 let branches: Vec<Arc<dyn CtxStorage>> = items_arr
278 .into_iter()
279 .map(|item| -> Result<Arc<dyn CtxStorage>, EvalError> {
280 let branch_ctx = write_path(bind, snap.clone(), item)?;
281 Ok(MemoryCtx::shared(branch_ctx))
282 })
283 .collect::<Result<_, _>>()?;
284
285 let branch_futs: Vec<_> = branches
288 .iter()
289 .map(|b| eval_async_with_storage_externs(body, b.clone(), dispatcher, externs))
290 .collect();
291
292 let joined: Value = match join {
293 JoinMode::All => {
294 futures::future::try_join_all(branch_futs).await?;
295 Value::Array(branches.iter().map(|b| b.snapshot()).collect())
296 }
297 JoinMode::Any => {
298 if branch_futs.is_empty() {
299 Value::Array(vec![])
300 } else {
301 let mapped: Vec<_> = branch_futs
302 .into_iter()
303 .enumerate()
304 .map(|(i, f)| f.map(move |r| r.map(|()| i)).boxed())
305 .collect();
306 let (winner_idx, _rest) = select_ok(mapped).await?;
307 branches[winner_idx].snapshot()
308 }
309 }
310 JoinMode::Race => {
311 if branch_futs.is_empty() {
312 Value::Array(vec![])
313 } else {
314 let mapped: Vec<_> = branch_futs
315 .into_iter()
316 .enumerate()
317 .map(|(i, f)| f.map(move |r| r.map(|()| i)).boxed())
318 .collect();
319 let (first, _idx, _rest) = futures::future::select_all(mapped).await;
320 let winner_idx = first?;
321 branches[winner_idx].snapshot()
322 }
323 }
324 JoinMode::AllSettled => {
325 let results = join_all(branch_futs).await;
326 let records: Vec<Value> = results
327 .into_iter()
328 .zip(branches.iter())
329 .map(|(r, b)| match r {
330 Ok(()) => serde_json::json!({"status": "fulfilled", "value": b.snapshot()}),
331 Err(e) => serde_json::json!({"status": "rejected", "reason": e.to_string()}),
332 })
333 .collect();
334 Value::Array(records)
335 }
336 };
337
338 ctx.write(path_str_async(out)?, joined)
339}
340
341use mlua::LuaSerdeExt;
346
347struct LuaDispatcher<'a> {
353 lua: &'a mlua::Lua,
354 func: mlua::Function,
355}
356
357impl<'a> Dispatcher for LuaDispatcher<'a> {
358 fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
359 let lua_input = self
360 .lua
361 .to_value(&input)
362 .map_err(|e| EvalError::DispatcherError {
363 ref_: ref_.into(),
364 msg: format!("to_value: {}", e),
365 })?;
366 let result: mlua::Value = self.func.call((ref_.to_string(), lua_input)).map_err(|e| {
367 EvalError::DispatcherError {
368 ref_: ref_.into(),
369 msg: format!("lua call: {}", e),
370 }
371 })?;
372 let value: Value = self
373 .lua
374 .from_value(result)
375 .map_err(|e| EvalError::DispatcherError {
376 ref_: ref_.into(),
377 msg: format!("from_value: {}", e),
378 })?;
379 Ok(value)
380 }
381}
382
383struct LuaExterns<'a> {
390 lua: &'a mlua::Lua,
391 table: mlua::Table,
392}
393
394impl<'a> flow_ir_core::Externs for LuaExterns<'a> {
395 fn call(&self, ref_: &str, args: &[Value]) -> Result<Value, EvalError> {
396 let func: mlua::Function = self.table.get(ref_).map_err(|_| EvalError::ExternError {
397 ref_: ref_.into(),
398 msg: "not registered in externs (or not a function)".into(),
399 })?;
400 let mut lua_args = mlua::MultiValue::new();
401 for a in args {
402 lua_args.push_back(self.lua.to_value(a).map_err(|e| EvalError::ExternError {
403 ref_: ref_.into(),
404 msg: format!("to_value: {}", e),
405 })?);
406 }
407 let result: mlua::Value = func.call(lua_args).map_err(|e| EvalError::ExternError {
408 ref_: ref_.into(),
409 msg: format!("lua call: {}", e),
410 })?;
411 self.lua
412 .from_value(result)
413 .map_err(|e| EvalError::ExternError {
414 ref_: ref_.into(),
415 msg: format!("from_value: {}", e),
416 })
417 }
418}
419
420pub fn module(lua: &mlua::Lua) -> mlua::Result<mlua::Table> {
463 let t = lua.create_table()?;
464 t.set("version", env!("CARGO_PKG_VERSION"))?;
465
466 let eval_fn = lua.create_function(
467 |lua_inner: &mlua::Lua,
468 (node_val, ctx_val, dispatcher_fn, externs_val): (
469 mlua::Value,
470 mlua::Value,
471 mlua::Function,
472 Option<mlua::Table>,
473 )| {
474 let node: Node = lua_inner
475 .from_value(node_val)
476 .map_err(|e| mlua::Error::external(format!("node parse: {}", e)))?;
477 let ctx: Value = lua_inner
478 .from_value(ctx_val)
479 .map_err(|e| mlua::Error::external(format!("ctx parse: {}", e)))?;
480
481 let dispatcher = LuaDispatcher {
482 lua: lua_inner,
483 func: dispatcher_fn,
484 };
485 let result = match externs_val {
486 Some(table) => {
487 let externs = LuaExterns {
488 lua: lua_inner,
489 table,
490 };
491 eval_externs(&node, ctx, &dispatcher, &externs)
492 }
493 None => eval(&node, ctx, &dispatcher),
494 }
495 .map_err(|e| mlua::Error::external(format!("eval: {}", e)))?;
496 lua_inner.to_value(&result)
497 },
498 )?;
499 t.set("eval", eval_fn)?;
500
501 Ok(t)
502}