Skip to main content

a3s_flow/
runner.rs

1//! Flow execution engine.
2//!
3//! [`FlowRunner`] takes a [`DagGraph`] and a [`NodeRegistry`], executes each
4//! node wave-by-wave, and returns a [`FlowResult`].
5//!
6//! Two execution modes are available:
7//! - [`FlowRunner::run`] — fire-and-forget: run to completion with no external control
8//! - [`FlowRunner::run_controlled`] — used by [`FlowEngine`] to support pause / resume / terminate
9
10use std::collections::{HashMap, HashSet};
11use std::sync::{Arc, RwLock};
12use std::time::Duration;
13
14use tokio::sync::Semaphore;
15
16use serde_json::Value;
17use tokio::sync::watch;
18use tokio::task::JoinSet;
19use tokio_util::sync::CancellationToken;
20use tracing::{debug, info, instrument, Instrument};
21use uuid::Uuid;
22
23use crate::error::{FlowError, Result};
24use crate::event::{EventEmitter, NoopEventEmitter};
25use crate::flow_store::FlowStore;
26use crate::graph::DagGraph;
27use crate::node::{ExecContext, Node, RetryPolicy};
28use crate::registry::NodeRegistry;
29use crate::result::FlowResult;
30
31/// Signal used to control a running execution.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub(crate) enum FlowSignal {
34    /// The flow should continue executing.
35    Run,
36    /// The flow should pause at the next wave boundary.
37    Pause,
38}
39
40/// Executes a [`DagGraph`] using registered [`Node`](crate::node::Node) implementations.
41///
42/// For lifecycle control (pause / resume / terminate), use [`FlowEngine`](crate::engine::FlowEngine)
43/// instead of constructing a `FlowRunner` directly.
44///
45/// # Example
46///
47/// ```rust,no_run
48/// use a3s_flow::{DagGraph, FlowRunner, NodeRegistry};
49/// use serde_json::json;
50///
51/// #[tokio::main]
52/// async fn main() {
53///     let def = json!({
54///         "nodes": [
55///             { "id": "start", "type": "noop" },
56///             { "id": "end",   "type": "noop" }
57///         ],
58///         "edges": [{ "source": "start", "target": "end" }]
59///     });
60///     let dag = DagGraph::from_json(&def).unwrap();
61///     let registry = NodeRegistry::with_defaults();
62///     let runner = FlowRunner::new(dag, registry);
63///     let result = runner.run(Default::default()).await.unwrap();
64///     println!("{:?}", result.outputs);
65/// }
66/// ```
67pub struct FlowRunner {
68    dag: DagGraph,
69    registry: Arc<NodeRegistry>,
70    emitter: Arc<dyn EventEmitter>,
71    flow_store: Option<Arc<dyn FlowStore>>,
72    /// When set, at most this many nodes execute concurrently within a wave.
73    max_concurrency: Option<usize>,
74}
75
76impl FlowRunner {
77    /// Create a new runner from a validated DAG and a node registry.
78    ///
79    /// Uses [`NoopEventEmitter`] by default. Call
80    /// [`.with_event_emitter`](Self::with_event_emitter) to register a custom
81    /// listener before executing.
82    pub fn new(dag: DagGraph, registry: NodeRegistry) -> Self {
83        Self {
84            dag,
85            registry: Arc::new(registry),
86            emitter: Arc::new(NoopEventEmitter),
87            flow_store: None,
88            max_concurrency: None,
89        }
90    }
91
92    /// Create a new runner sharing an existing `Arc<NodeRegistry>`.
93    ///
94    /// Used by the `"iteration"` and `"sub-flow"` nodes so that sub-flow
95    /// runners share the same registry without extra `Arc` wrapping.
96    pub fn with_arc_registry(dag: DagGraph, registry: Arc<NodeRegistry>) -> Self {
97        Self {
98            dag,
99            registry,
100            emitter: Arc::new(NoopEventEmitter),
101            flow_store: None,
102            max_concurrency: None,
103        }
104    }
105
106    /// Attach a custom event emitter to this runner.
107    ///
108    /// The emitter receives node and flow lifecycle events during execution.
109    /// Returns `self` for method chaining.
110    pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
111        self.emitter = emitter;
112        self
113    }
114
115    /// Attach a flow definition store to this runner.
116    ///
117    /// When set, the store is passed to every [`ExecContext`] so that nodes
118    /// like `"sub-flow"` can load named flow definitions at execution time.
119    /// Returns `self` for method chaining.
120    pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
121        self.flow_store = Some(store);
122        self
123    }
124
125    /// Limit the number of nodes that may execute concurrently within a single
126    /// wave.
127    ///
128    /// By default all ready nodes in a wave run in parallel. Setting
129    /// `max_concurrency` to `n` caps this using a Tokio [`Semaphore`] so that
130    /// at most `n` nodes are active at the same time. Useful when downstream
131    /// services impose rate limits.
132    ///
133    /// Returns `self` for method chaining.
134    pub fn with_max_concurrency(mut self, n: usize) -> Self {
135        self.max_concurrency = Some(n);
136        self
137    }
138
139    /// Execute the flow to completion with no external control signals.
140    #[instrument(skip(self, variables), fields(execution_id))]
141    pub async fn run(&self, variables: HashMap<String, Value>) -> Result<FlowResult> {
142        let execution_id = Uuid::new_v4();
143        tracing::Span::current().record("execution_id", execution_id.to_string());
144        // No-op signal channel and a token that is never cancelled.
145        let (_tx, rx) = watch::channel(FlowSignal::Run);
146        let cancel = CancellationToken::new();
147        self.run_seeded(
148            execution_id,
149            variables,
150            rx,
151            cancel,
152            HashMap::new(),
153            HashSet::new(),
154            HashSet::new(),
155        )
156        .await
157    }
158
159    /// Resume a flow from a prior (partial or complete) result, skipping nodes
160    /// that already have outputs in `prior`.
161    ///
162    /// A new execution ID is assigned to the resumed run. Nodes listed in
163    /// `prior.completed_nodes` are not re-executed; their outputs from `prior`
164    /// are used directly as inputs for any downstream nodes that still need to run.
165    ///
166    /// # Example
167    ///
168    /// ```rust,no_run
169    /// # use a3s_flow::{DagGraph, FlowRunner, NodeRegistry};
170    /// # use serde_json::json;
171    /// # use std::collections::HashMap;
172    /// # #[tokio::main] async fn main() {
173    /// let def = json!({ "nodes": [{ "id": "a", "type": "noop" }], "edges": [] });
174    /// let dag = DagGraph::from_json(&def).unwrap();
175    /// let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
176    /// let partial = runner.run(HashMap::new()).await.unwrap();
177    /// // Resume with the partial result — completed nodes are skipped.
178    /// let full = runner.resume_from(&partial, HashMap::new()).await.unwrap();
179    /// # }
180    /// ```
181    pub async fn resume_from(
182        &self,
183        prior: &FlowResult,
184        variables: HashMap<String, Value>,
185    ) -> Result<FlowResult> {
186        let execution_id = Uuid::new_v4();
187        let (_tx, rx) = watch::channel(FlowSignal::Run);
188        let cancel = CancellationToken::new();
189        self.run_seeded(
190            execution_id,
191            variables,
192            rx,
193            cancel,
194            prior.outputs.clone(),
195            prior.completed_nodes.clone(),
196            prior.skipped_nodes.clone(),
197        )
198        .await
199    }
200
201    /// Execute the flow with external pause / resume / terminate control.
202    ///
203    /// This is the method used by [`FlowEngine`](crate::engine::FlowEngine).
204    /// Prefer using `FlowEngine` rather than calling this directly.
205    pub(crate) async fn run_controlled(
206        &self,
207        execution_id: Uuid,
208        variables: HashMap<String, Value>,
209        signal_rx: watch::Receiver<FlowSignal>,
210        cancel: CancellationToken,
211    ) -> Result<FlowResult> {
212        self.run_seeded(
213            execution_id,
214            variables,
215            signal_rx,
216            cancel,
217            HashMap::new(),
218            HashSet::new(),
219            HashSet::new(),
220        )
221        .await
222    }
223
224    // ── Internal implementation ────────────────────────────────────────────
225
226    /// Emits flow lifecycle events around [`execute_waves`].
227    ///
228    /// `initial_*` collections seed execution for partial-resume; pass empty
229    /// collections for a fresh run.
230    #[allow(clippy::too_many_arguments)]
231    async fn run_seeded(
232        &self,
233        execution_id: Uuid,
234        variables: HashMap<String, Value>,
235        signal_rx: watch::Receiver<FlowSignal>,
236        cancel: CancellationToken,
237        initial_outputs: HashMap<String, Value>,
238        initial_completed: HashSet<String>,
239        initial_skipped: HashSet<String>,
240    ) -> Result<FlowResult> {
241        info!(%execution_id, "flow execution started");
242        self.emitter.on_flow_started(execution_id).await;
243
244        let outcome = self
245            .execute_waves(
246                execution_id,
247                variables,
248                signal_rx,
249                cancel,
250                initial_outputs,
251                initial_completed,
252                initial_skipped,
253            )
254            .await;
255
256        match &outcome {
257            Ok(result) => {
258                info!(%execution_id, "flow execution complete");
259                self.emitter.on_flow_completed(execution_id, result).await;
260            }
261            Err(FlowError::Terminated) => {
262                info!(%execution_id, "flow execution terminated");
263                self.emitter.on_flow_terminated(execution_id).await;
264            }
265            Err(e) => {
266                tracing::warn!(%execution_id, error = %e, "flow execution failed");
267                self.emitter
268                    .on_flow_failed(execution_id, &e.to_string())
269                    .await;
270            }
271        }
272
273        outcome
274    }
275
276    /// Wave-based execution engine — emits node events, no flow lifecycle events.
277    #[allow(clippy::too_many_arguments)]
278    async fn execute_waves(
279        &self,
280        execution_id: Uuid,
281        variables: HashMap<String, Value>,
282        mut signal_rx: watch::Receiver<FlowSignal>,
283        cancel: CancellationToken,
284        initial_outputs: HashMap<String, Value>,
285        initial_completed: HashSet<String>,
286        initial_skipped: HashSet<String>,
287    ) -> Result<FlowResult> {
288        // `variables` is mutable so that `"assign"` nodes can inject new values
289        // into the running variable scope between waves.
290        let mut variables = variables;
291        // Shared mutable context for cross-node state (Dify-style global context)
292        let context = Arc::new(RwLock::new(HashMap::new()));
293        let mut outputs = initial_outputs;
294        let mut completed = initial_completed;
295        // Nodes whose `run_if` evaluated to false — used to propagate skips.
296        let mut skipped = initial_skipped;
297        // Only include nodes that haven't completed yet.
298        let mut remaining: Vec<String> = self
299            .dag
300            .nodes_in_order()
301            .map(|n| n.id.clone())
302            .filter(|id| !completed.contains(id))
303            .collect();
304
305        while !remaining.is_empty() {
306            // ── Pause / cancel checkpoint (between waves) ──────────────────
307            loop {
308                if cancel.is_cancelled() {
309                    return Err(FlowError::Terminated);
310                }
311                // Copy the signal value before matching so the borrow is
312                // released before we call signal_rx.changed() below.
313                let signal = *signal_rx.borrow();
314                match signal {
315                    FlowSignal::Run => break,
316                    FlowSignal::Pause => {
317                        // Block until the signal changes or we are cancelled.
318                        tokio::select! {
319                            _ = signal_rx.changed() => continue,
320                            _ = cancel.cancelled()  => return Err(FlowError::Terminated),
321                        }
322                    }
323                }
324            }
325
326            // ── Find nodes ready to run ────────────────────────────────────
327            let (ready, not_ready): (Vec<_>, Vec<_>) = remaining.into_iter().partition(|id| {
328                self.dag
329                    .dependencies_of(id)
330                    .iter()
331                    .all(|dep| completed.contains(dep))
332            });
333
334            if ready.is_empty() {
335                return Err(FlowError::Internal(
336                    "execution stalled: no nodes are ready but not all nodes are done".into(),
337                ));
338            }
339
340            remaining = not_ready;
341
342            // ── Collect assign-node IDs before consuming `ready` ──────────
343            // After the wave completes, these nodes' outputs are merged into
344            // the live variable map so that downstream nodes see the new values.
345            let assign_node_ids: Vec<String> = ready
346                .iter()
347                .filter(|id| {
348                    self.dag
349                        .nodes
350                        .get(*id)
351                        .map(|n| n.write_to_variables)
352                        .unwrap_or(false)
353                })
354                .cloned()
355                .collect();
356
357            // ── Concurrency limiter for this wave ─────────────────────────
358            let semaphore = self.max_concurrency.map(|n| Arc::new(Semaphore::new(n)));
359
360            // ── Launch ready nodes concurrently ───────────────────────────
361            let mut join_set: JoinSet<(String, Result<Value>)> = JoinSet::new();
362
363            for node_id in ready {
364                let node_def = self.dag.nodes[&node_id].clone();
365
366                // Check run_if guard: if the condition fails, skip this node.
367                if let Some(ref cond) = node_def.run_if {
368                    if !cond.evaluate(&outputs, &skipped) {
369                        debug!(%node_id, "node skipped (run_if condition false)");
370                        self.emitter.on_node_skipped(execution_id, &node_id).await;
371                        outputs.insert(node_id.clone(), Value::Null);
372                        skipped.insert(node_id.clone());
373                        completed.insert(node_id);
374                        continue;
375                    }
376                }
377
378                let node = self.registry.get(&node_def.node_type)?;
379
380                let inputs: HashMap<String, Value> = self
381                    .dag
382                    .dependencies_of(&node_id)
383                    .iter()
384                    .filter_map(|dep| outputs.get(dep).map(|v| (dep.clone(), v.clone())))
385                    .collect();
386
387                let ctx = ExecContext {
388                    data: node_def.data.clone(),
389                    inputs,
390                    variables: variables.clone(),
391                    context: Arc::clone(&context),
392                    registry: Arc::clone(&self.registry),
393                    flow_store: self.flow_store.clone(),
394                };
395
396                let retry = node_def.retry.clone();
397                let timeout_ms = node_def.timeout_ms;
398                let continue_on_error = node_def.continue_on_error;
399                let emitter = Arc::clone(&self.emitter);
400                let sem = semaphore.clone();
401
402                debug!(
403                    %node_id,
404                    node_type = %node_def.node_type,
405                    retry = ?retry.as_ref().map(|r| r.max_attempts),
406                    timeout_ms,
407                    continue_on_error,
408                    "executing node"
409                );
410
411                // ── Per-node OTel-compatible span ──────────────────────────
412                let span = tracing::info_span!(
413                    "node.execute",
414                    node_id = node_id.as_str(),
415                    node_type = node_def.node_type.as_str(),
416                    %execution_id,
417                );
418
419                join_set.spawn(
420                    async move {
421                        // Acquire concurrency permit inside the task so all
422                        // tasks are spawned immediately but only `max_concurrency`
423                        // run at the same time. The permit is released on drop.
424                        let _permit = if let Some(ref s) = sem {
425                            Some(Arc::clone(s).acquire_owned().await.ok())
426                        } else {
427                            None
428                        };
429
430                        emitter
431                            .on_node_started(execution_id, &node_id, &node_def.node_type)
432                            .await;
433
434                        let result: Result<Value> =
435                            execute_with_policy(node, ctx, retry, timeout_ms)
436                                .await
437                                .map_err(|e| FlowError::NodeFailed {
438                                    node_id: node_id.clone(),
439                                    execution_id,
440                                    reason: e.to_string(),
441                                });
442
443                        // If continue_on_error is set, absorb failure and emit
444                        // a completed event with an `__error__` sentinel output.
445                        let result: Result<Value> = if continue_on_error {
446                            result
447                                .or_else(|e| Ok(serde_json::json!({ "__error__": e.to_string() })))
448                        } else {
449                            result
450                        };
451
452                        match &result {
453                            Ok(v) => {
454                                emitter.on_node_completed(execution_id, &node_id, v).await;
455                            }
456                            Err(e) => {
457                                emitter
458                                    .on_node_failed(execution_id, &node_id, &e.to_string())
459                                    .await;
460                            }
461                        }
462
463                        (node_id, result)
464                    }
465                    .instrument(span),
466                );
467            }
468
469            // ── Collect results (cancel-aware) ─────────────────────────────
470            loop {
471                tokio::select! {
472                    // Termination signal takes priority over pending node results.
473                    _ = cancel.cancelled() => {
474                        // Remaining tasks are aborted when join_set is dropped.
475                        return Err(FlowError::Terminated);
476                    }
477                    maybe = join_set.join_next() => {
478                        match maybe {
479                            None => break, // all nodes in this wave done
480                            Some(Ok((node_id, Ok(value)))) => {
481                                debug!(%node_id, "node completed");
482                                outputs.insert(node_id.clone(), value);
483                                completed.insert(node_id);
484                            }
485                            Some(Ok((_, Err(e)))) => return Err(e),
486                            Some(Err(join_err)) if join_err.is_cancelled() => {
487                                return Err(FlowError::Terminated);
488                            }
489                            Some(Err(e)) => return Err(FlowError::Internal(e.to_string())),
490                        }
491                    }
492                }
493            }
494
495            // ── Merge assign-node outputs into the live variable map ───────
496            // Only non-error outputs are merged (skip `continue_on_error` sentinels).
497            for node_id in &assign_node_ids {
498                if let Some(Value::Object(obj)) = outputs.get(node_id) {
499                    if !obj.contains_key("__error__") {
500                        for (k, v) in obj {
501                            variables.insert(k.clone(), v.clone());
502                        }
503                    }
504                }
505            }
506        }
507
508        Ok(FlowResult {
509            execution_id,
510            outputs,
511            completed_nodes: completed,
512            skipped_nodes: skipped,
513        })
514    }
515}
516
517// ── Node execution helper ──────────────────────────────────────────────────
518
519/// Execute a node with optional retry and timeout policies.
520///
521/// - Retries up to `retry.max_attempts` times (first attempt included).
522/// - Each retry waits `backoff_ms * 2^(attempt-1)` ms (capped at 64× base).
523/// - Each individual attempt is bounded by `timeout_ms` if set.
524async fn execute_with_policy(
525    node: Arc<dyn Node>,
526    ctx: ExecContext,
527    retry: Option<RetryPolicy>,
528    timeout_ms: Option<u64>,
529) -> Result<Value> {
530    let max_attempts = retry.as_ref().map(|r| r.max_attempts.max(1)).unwrap_or(1);
531    let backoff_ms = retry.as_ref().map(|r| r.backoff_ms).unwrap_or(0);
532
533    let mut last_err = FlowError::Internal("no attempts made".into());
534
535    for attempt in 0..max_attempts {
536        if attempt > 0 && backoff_ms > 0 {
537            // Exponential backoff: base * 2^(attempt-1), capped at base * 64.
538            let multiplier = 1u64 << (attempt - 1).min(6);
539            let delay = backoff_ms.saturating_mul(multiplier);
540            tokio::time::sleep(Duration::from_millis(delay)).await;
541        }
542
543        let fut = node.execute(ctx.clone());
544
545        let result = if let Some(ms) = timeout_ms {
546            tokio::time::timeout(Duration::from_millis(ms), fut)
547                .await
548                .unwrap_or_else(|_| Err(FlowError::Internal(format!("timed out after {ms}ms"))))
549        } else {
550            fut.await
551        };
552
553        match result {
554            Ok(v) => return Ok(v),
555            Err(e) => last_err = e,
556        }
557    }
558
559    Err(last_err)
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use crate::graph::DagGraph;
566    use crate::registry::NodeRegistry;
567    use serde_json::json;
568
569    #[tokio::test]
570    async fn runs_linear_flow() {
571        let def = json!({
572            "nodes": [
573                { "id": "a", "type": "noop" },
574                { "id": "b", "type": "noop" },
575                { "id": "c", "type": "noop" }
576            ],
577            "edges": [
578                { "source": "a", "target": "b" },
579                { "source": "b", "target": "c" }
580            ]
581        });
582        let dag = DagGraph::from_json(&def).unwrap();
583        let registry = NodeRegistry::with_defaults();
584        let runner = FlowRunner::new(dag, registry);
585        let result = runner.run(HashMap::new()).await.unwrap();
586
587        assert!(result.outputs.contains_key("a"));
588        assert!(result.outputs.contains_key("b"));
589        assert!(result.outputs.contains_key("c"));
590    }
591
592    #[tokio::test]
593    async fn runs_parallel_fan_out() {
594        let def = json!({
595            "nodes": [
596                { "id": "start", "type": "noop" },
597                { "id": "b",     "type": "noop" },
598                { "id": "c",     "type": "noop" },
599                { "id": "end",   "type": "noop" }
600            ],
601            "edges": [
602                { "source": "start", "target": "b" },
603                { "source": "start", "target": "c" },
604                { "source": "b",     "target": "end" },
605                { "source": "c",     "target": "end" }
606            ]
607        });
608        let dag = DagGraph::from_json(&def).unwrap();
609        let registry = NodeRegistry::with_defaults();
610        let runner = FlowRunner::new(dag, registry);
611        let result = runner.run(HashMap::new()).await.unwrap();
612        assert_eq!(result.outputs.len(), 4);
613    }
614
615    #[tokio::test]
616    async fn variables_available_in_context() {
617        let def = json!({ "nodes": [{ "id": "only", "type": "noop" }], "edges": [] });
618        let dag = DagGraph::from_json(&def).unwrap();
619        let registry = NodeRegistry::with_defaults();
620        let runner = FlowRunner::new(dag, registry);
621
622        let vars = HashMap::from([("env".into(), json!("production"))]);
623        let result = runner.run(vars).await.unwrap();
624        assert!(result.outputs.contains_key("only"));
625    }
626
627    #[tokio::test]
628    async fn run_if_skips_node_when_if_else_falls_to_else() {
629        // "route" if-else: data == 999 → no match → branch = "else"
630        // "process" run_if checks branch == "hit" → skipped
631        let def = json!({
632            "nodes": [
633                { "id": "data", "type": "noop" },
634                {
635                    "id": "route", "type": "if-else",
636                    "data": { "cases": [{ "id": "hit", "conditions": [{ "from": "data", "path": "", "op": "eq", "value": 999 }] }] }
637                },
638                {
639                    "id": "process", "type": "noop",
640                    "data": { "run_if": { "from": "route", "path": "branch", "op": "eq", "value": "hit" } }
641                }
642            ],
643            "edges": [
644                { "source": "data",  "target": "route" },
645                { "source": "route", "target": "process" }
646            ]
647        });
648        let dag = DagGraph::from_json(&def).unwrap();
649        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
650        let result = runner.run(HashMap::new()).await.unwrap();
651
652        assert_eq!(result.outputs["process"], json!(null));
653    }
654
655    #[tokio::test]
656    async fn run_if_executes_node_when_if_else_matches() {
657        // noop outputs {} — if-else matches {} == {} → branch = "hit"
658        let def = json!({
659            "nodes": [
660                { "id": "src", "type": "noop" },
661                {
662                    "id": "gate", "type": "if-else",
663                    "data": { "cases": [{ "id": "hit", "conditions": [{ "from": "src", "path": "", "op": "eq", "value": {} }] }] }
664                },
665                {
666                    "id": "sink", "type": "noop",
667                    "data": { "run_if": { "from": "gate", "path": "branch", "op": "eq", "value": "hit" } }
668                }
669            ],
670            "edges": [
671                { "source": "src",  "target": "gate" },
672                { "source": "gate", "target": "sink" }
673            ]
674        });
675        let dag = DagGraph::from_json(&def).unwrap();
676        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
677        let result = runner.run(HashMap::new()).await.unwrap();
678
679        assert!(result.outputs["sink"].is_object());
680        assert_ne!(result.outputs["sink"], json!(null));
681    }
682
683    #[tokio::test]
684    async fn skip_propagates_through_chain() {
685        // A → B (run_if fails on missing field) → C (run_if on B which is in skipped set)
686        let def = json!({
687            "nodes": [
688                { "id": "a", "type": "noop" },
689                {
690                    "id": "b", "type": "noop",
691                    "data": { "run_if": { "from": "a", "path": "nonexistent_field", "op": "eq", "value": true } }
692                },
693                {
694                    "id": "c", "type": "noop",
695                    "data": { "run_if": { "from": "b", "path": "x", "op": "eq", "value": 1 } }
696                }
697            ],
698            "edges": [
699                { "source": "a", "target": "b" },
700                { "source": "b", "target": "c" }
701            ]
702        });
703        let dag = DagGraph::from_json(&def).unwrap();
704        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
705        let result = runner.run(HashMap::new()).await.unwrap();
706
707        assert_eq!(result.outputs["b"], json!(null));
708        assert_eq!(result.outputs["c"], json!(null));
709    }
710
711    #[tokio::test]
712    async fn if_else_with_variable_aggregator_fan_in() {
713        // route → path_ok (run if "ok") / path_err (run if "else") → merge
714        let def = json!({
715            "nodes": [
716                { "id": "src", "type": "noop" },
717                {
718                    "id": "route", "type": "if-else",
719                    "data": { "cases": [{ "id": "ok", "conditions": [{ "from": "src", "path": "", "op": "eq", "value": {} }] }] }
720                },
721                {
722                    "id": "path_ok", "type": "noop",
723                    "data": { "run_if": { "from": "route", "path": "branch", "op": "eq", "value": "ok" } }
724                },
725                {
726                    "id": "path_err", "type": "noop",
727                    "data": { "run_if": { "from": "route", "path": "branch", "op": "eq", "value": "else" } }
728                },
729                {
730                    "id": "merge", "type": "variable-aggregator",
731                    "data": { "inputs": ["path_ok", "path_err"] }
732                }
733            ],
734            "edges": [
735                { "source": "src",      "target": "route" },
736                { "source": "route",    "target": "path_ok" },
737                { "source": "route",    "target": "path_err" },
738                { "source": "path_ok",  "target": "merge" },
739                { "source": "path_err", "target": "merge" }
740            ]
741        });
742        let dag = DagGraph::from_json(&def).unwrap();
743        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
744        let result = runner.run(HashMap::new()).await.unwrap();
745
746        // path_ok ran (src == {}), path_err was skipped → merge returns path_ok's output
747        assert!(!result.outputs["merge"]["output"].is_null());
748        assert_eq!(result.outputs["path_err"], json!(null));
749    }
750
751    // ── completed_nodes / skipped_nodes tracking ───────────────────────────
752
753    #[tokio::test]
754    async fn completed_nodes_tracks_all_executed_nodes() {
755        let def = json!({
756            "nodes": [
757                { "id": "a", "type": "noop" },
758                { "id": "b", "type": "noop" }
759            ],
760            "edges": [{ "source": "a", "target": "b" }]
761        });
762        let dag = DagGraph::from_json(&def).unwrap();
763        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
764        let result = runner.run(HashMap::new()).await.unwrap();
765
766        assert!(result.completed_nodes.contains("a"));
767        assert!(result.completed_nodes.contains("b"));
768        assert!(result.skipped_nodes.is_empty());
769    }
770
771    #[tokio::test]
772    async fn skipped_nodes_tracks_run_if_skipped_nodes() {
773        // "a" → "b" with run_if that always fails → "b" is skipped
774        let def = json!({
775            "nodes": [
776                { "id": "a", "type": "noop" },
777                {
778                    "id": "b", "type": "noop",
779                    "data": { "run_if": { "from": "a", "path": "nonexistent", "op": "eq", "value": true } }
780                }
781            ],
782            "edges": [{ "source": "a", "target": "b" }]
783        });
784        let dag = DagGraph::from_json(&def).unwrap();
785        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
786        let result = runner.run(HashMap::new()).await.unwrap();
787
788        assert!(result.completed_nodes.contains("a"));
789        assert!(result.completed_nodes.contains("b"));
790        assert!(result.skipped_nodes.contains("b"));
791        assert!(!result.skipped_nodes.contains("a"));
792    }
793
794    // ── retry policy ───────────────────────────────────────────────────────
795
796    #[tokio::test]
797    async fn retry_succeeds_after_transient_failures() {
798        use crate::node::{ExecContext, Node};
799        use async_trait::async_trait;
800        use std::sync::atomic::{AtomicU32, Ordering};
801
802        // Fails twice, succeeds on the third attempt.
803        struct FlakyNode {
804            call_count: Arc<AtomicU32>,
805        }
806
807        #[async_trait]
808        impl Node for FlakyNode {
809            fn node_type(&self) -> &str {
810                "flaky"
811            }
812
813            async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
814                let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
815                if n < 3 {
816                    Err(FlowError::Internal(format!("transient failure #{n}")))
817                } else {
818                    Ok(json!({ "ok": true }))
819                }
820            }
821        }
822
823        let call_count = Arc::new(AtomicU32::new(0));
824        let mut registry = NodeRegistry::with_defaults();
825        registry.register(Arc::new(FlakyNode {
826            call_count: Arc::clone(&call_count),
827        }));
828
829        let def = json!({
830            "nodes": [{
831                "id": "step",
832                "type": "flaky",
833                "data": { "retry": { "max_attempts": 3, "backoff_ms": 0 } }
834            }],
835            "edges": []
836        });
837        let dag = DagGraph::from_json(&def).unwrap();
838        let runner = FlowRunner::new(dag, registry);
839        let result = runner.run(HashMap::new()).await.unwrap();
840
841        assert_eq!(result.outputs["step"]["ok"], json!(true));
842        assert_eq!(call_count.load(Ordering::SeqCst), 3);
843    }
844
845    #[tokio::test]
846    async fn retry_exhausted_returns_last_error() {
847        use crate::node::{ExecContext, Node};
848        use async_trait::async_trait;
849
850        // Always fails.
851        struct AlwaysFailNode;
852
853        #[async_trait]
854        impl Node for AlwaysFailNode {
855            fn node_type(&self) -> &str {
856                "always-fail"
857            }
858
859            async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
860                Err(FlowError::Internal("permanent failure".into()))
861            }
862        }
863
864        let mut registry = NodeRegistry::with_defaults();
865        registry.register(Arc::new(AlwaysFailNode));
866
867        let def = json!({
868            "nodes": [{
869                "id": "step",
870                "type": "always-fail",
871                "data": { "retry": { "max_attempts": 2, "backoff_ms": 0 } }
872            }],
873            "edges": []
874        });
875        let dag = DagGraph::from_json(&def).unwrap();
876        let runner = FlowRunner::new(dag, registry);
877        let err = runner.run(HashMap::new()).await.unwrap_err();
878
879        assert!(matches!(err, FlowError::NodeFailed { .. }));
880        let msg = err.to_string();
881        assert!(msg.contains("permanent failure"));
882    }
883
884    // ── timeout ────────────────────────────────────────────────────────────
885
886    #[tokio::test]
887    async fn timeout_kills_slow_node() {
888        use crate::node::{ExecContext, Node};
889        use async_trait::async_trait;
890
891        struct SlowNode;
892
893        #[async_trait]
894        impl Node for SlowNode {
895            fn node_type(&self) -> &str {
896                "slow-timeout"
897            }
898
899            async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
900                tokio::time::sleep(Duration::from_millis(500)).await;
901                Ok(json!({}))
902            }
903        }
904
905        let mut registry = NodeRegistry::with_defaults();
906        registry.register(Arc::new(SlowNode));
907
908        // timeout_ms (50ms) is well below node sleep (500ms).
909        let def = json!({
910            "nodes": [{
911                "id": "step",
912                "type": "slow-timeout",
913                "data": { "timeout_ms": 50 }
914            }],
915            "edges": []
916        });
917        let dag = DagGraph::from_json(&def).unwrap();
918        let runner = FlowRunner::new(dag, registry);
919        let err = runner.run(HashMap::new()).await.unwrap_err();
920
921        assert!(matches!(err, FlowError::NodeFailed { .. }));
922        assert!(err.to_string().contains("timed out"));
923    }
924
925    #[tokio::test]
926    async fn timeout_does_not_affect_fast_node() {
927        // noop is instant — a 200ms timeout should never trigger.
928        let def = json!({
929            "nodes": [{
930                "id": "step",
931                "type": "noop",
932                "data": { "timeout_ms": 200 }
933            }],
934            "edges": []
935        });
936        let dag = DagGraph::from_json(&def).unwrap();
937        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
938        let result = runner.run(HashMap::new()).await.unwrap();
939        assert!(result.outputs.contains_key("step"));
940    }
941
942    // ── partial execution resume ────────────────────────────────────────────
943
944    #[tokio::test]
945    async fn resume_from_skips_already_completed_nodes() {
946        use crate::node::{ExecContext, Node};
947        use async_trait::async_trait;
948        use std::sync::atomic::{AtomicU32, Ordering};
949
950        // Counts how many times it is called.
951        struct CountingNode {
952            call_count: Arc<AtomicU32>,
953        }
954
955        #[async_trait]
956        impl Node for CountingNode {
957            fn node_type(&self) -> &str {
958                "counting"
959            }
960
961            async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
962                self.call_count.fetch_add(1, Ordering::SeqCst);
963                Ok(json!({ "counted": true }))
964            }
965        }
966
967        let count_a = Arc::new(AtomicU32::new(0));
968        let count_b = Arc::new(AtomicU32::new(0));
969        let mut registry = NodeRegistry::with_defaults();
970        registry.register(Arc::new(CountingNode {
971            call_count: Arc::clone(&count_a),
972        }));
973
974        // We can't register two distinct "counting" nodes, so use noop for b.
975        let def = json!({
976            "nodes": [
977                { "id": "a", "type": "counting" },
978                { "id": "b", "type": "noop" }
979            ],
980            "edges": [{ "source": "a", "target": "b" }]
981        });
982
983        let dag = DagGraph::from_json(&def).unwrap();
984        let _ = count_b; // unused — b is noop
985        let runner = FlowRunner::new(dag, registry);
986
987        // Full first run — counting node executes once.
988        let first = runner.run(HashMap::new()).await.unwrap();
989        assert_eq!(count_a.load(Ordering::SeqCst), 1);
990
991        // Resume: "a" is already completed — should NOT re-execute.
992        let resumed = runner.resume_from(&first, HashMap::new()).await.unwrap();
993        assert_eq!(count_a.load(Ordering::SeqCst), 1); // still 1
994        assert!(resumed.outputs.contains_key("a"));
995        assert!(resumed.outputs.contains_key("b"));
996    }
997
998    #[tokio::test]
999    async fn resume_from_runs_only_pending_nodes() {
1000        // Simulate a partial result where only "a" has completed.
1001        // "b" has not run yet.  resume_from should run "b" only.
1002        use crate::node::{ExecContext, Node};
1003        use async_trait::async_trait;
1004        use std::sync::atomic::{AtomicU32, Ordering};
1005
1006        struct CountNode(Arc<AtomicU32>);
1007
1008        #[async_trait]
1009        impl Node for CountNode {
1010            fn node_type(&self) -> &str {
1011                "count-b"
1012            }
1013            async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
1014                self.0.fetch_add(1, Ordering::SeqCst);
1015                Ok(json!({ "ran": true }))
1016            }
1017        }
1018
1019        let count_b = Arc::new(AtomicU32::new(0));
1020        let mut registry = NodeRegistry::with_defaults();
1021        registry.register(Arc::new(CountNode(Arc::clone(&count_b))));
1022
1023        let def = json!({
1024            "nodes": [
1025                { "id": "a", "type": "noop" },
1026                { "id": "b", "type": "count-b" }
1027            ],
1028            "edges": [{ "source": "a", "target": "b" }]
1029        });
1030        let dag = DagGraph::from_json(&def).unwrap();
1031        let runner = FlowRunner::new(dag, registry);
1032
1033        // Build a partial result where only "a" is done.
1034        let partial = FlowResult {
1035            execution_id: uuid::Uuid::new_v4(),
1036            outputs: HashMap::from([("a".into(), json!({}))]),
1037            completed_nodes: HashSet::from(["a".into()]),
1038            skipped_nodes: HashSet::new(),
1039        };
1040
1041        let result = runner.resume_from(&partial, HashMap::new()).await.unwrap();
1042        assert_eq!(count_b.load(Ordering::SeqCst), 1);
1043        assert!(result.outputs["b"]["ran"].as_bool().unwrap());
1044
1045        // Resuming a fully-completed result should not re-run any node.
1046        let full = runner.run(HashMap::new()).await.unwrap();
1047        count_b.store(0, Ordering::SeqCst);
1048        let _ = runner.resume_from(&full, HashMap::new()).await.unwrap();
1049        assert_eq!(count_b.load(Ordering::SeqCst), 0);
1050
1051        let _ = partial; // suppress unused warning
1052    }
1053
1054    // ── continue_on_error ──────────────────────────────────────────────────
1055
1056    #[tokio::test]
1057    async fn continue_on_error_keeps_flow_running_after_node_failure() {
1058        use crate::node::{ExecContext, Node};
1059        use async_trait::async_trait;
1060
1061        struct FailNode;
1062
1063        #[async_trait]
1064        impl Node for FailNode {
1065            fn node_type(&self) -> &str {
1066                "always-fail-coe"
1067            }
1068            async fn execute(&self, _: ExecContext) -> Result<Value> {
1069                Err(FlowError::Internal("boom".into()))
1070            }
1071        }
1072
1073        let mut registry = NodeRegistry::with_defaults();
1074        registry.register(Arc::new(FailNode));
1075
1076        let def = json!({
1077            "nodes": [
1078                {
1079                    "id": "fail",
1080                    "type": "always-fail-coe",
1081                    "data": { "continue_on_error": true }
1082                },
1083                { "id": "after", "type": "noop" }
1084            ],
1085            "edges": [{ "source": "fail", "target": "after" }]
1086        });
1087
1088        let dag = DagGraph::from_json(&def).unwrap();
1089        let result = FlowRunner::new(dag, registry)
1090            .run(HashMap::new())
1091            .await
1092            .unwrap();
1093
1094        // "fail" should have an __error__ key in its output.
1095        assert!(result.outputs["fail"]["__error__"].is_string());
1096        // "after" should still have run.
1097        assert!(result.completed_nodes.contains("after"));
1098    }
1099
1100    #[tokio::test]
1101    async fn continue_on_error_false_halts_flow_on_failure() {
1102        use crate::node::{ExecContext, Node};
1103        use async_trait::async_trait;
1104
1105        struct FailNode2;
1106
1107        #[async_trait]
1108        impl Node for FailNode2 {
1109            fn node_type(&self) -> &str {
1110                "always-fail-halt"
1111            }
1112            async fn execute(&self, _: ExecContext) -> Result<Value> {
1113                Err(FlowError::Internal("halt".into()))
1114            }
1115        }
1116
1117        let mut registry = NodeRegistry::with_defaults();
1118        registry.register(Arc::new(FailNode2));
1119
1120        let def = json!({
1121            "nodes": [
1122                { "id": "fail", "type": "always-fail-halt" },
1123                { "id": "after", "type": "noop" }
1124            ],
1125            "edges": [{ "source": "fail", "target": "after" }]
1126        });
1127
1128        let dag = DagGraph::from_json(&def).unwrap();
1129        let err = FlowRunner::new(dag, registry)
1130            .run(HashMap::new())
1131            .await
1132            .unwrap_err();
1133
1134        assert!(matches!(err, FlowError::NodeFailed { .. }));
1135    }
1136
1137    // ── max_concurrency ────────────────────────────────────────────────────
1138
1139    #[tokio::test]
1140    async fn max_concurrency_limits_parallel_execution() {
1141        use crate::node::{ExecContext, Node};
1142        use async_trait::async_trait;
1143        use std::sync::atomic::{AtomicU32, Ordering};
1144
1145        // Tracks the peak number of concurrently-running nodes.
1146        let active = Arc::new(AtomicU32::new(0));
1147        let peak = Arc::new(AtomicU32::new(0));
1148
1149        struct PeakNode {
1150            active: Arc<AtomicU32>,
1151            peak: Arc<AtomicU32>,
1152        }
1153
1154        #[async_trait]
1155        impl Node for PeakNode {
1156            fn node_type(&self) -> &str {
1157                "peak-tracker"
1158            }
1159            async fn execute(&self, _: ExecContext) -> Result<Value> {
1160                let current = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1161                // Update peak.
1162                let mut prev = self.peak.load(Ordering::SeqCst);
1163                while current > prev {
1164                    match self.peak.compare_exchange_weak(
1165                        prev,
1166                        current,
1167                        Ordering::SeqCst,
1168                        Ordering::SeqCst,
1169                    ) {
1170                        Ok(_) => break,
1171                        Err(actual) => prev = actual,
1172                    }
1173                }
1174                tokio::time::sleep(Duration::from_millis(20)).await;
1175                self.active.fetch_sub(1, Ordering::SeqCst);
1176                Ok(json!({}))
1177            }
1178        }
1179
1180        let mut registry = NodeRegistry::with_defaults();
1181        registry.register(Arc::new(PeakNode {
1182            active: Arc::clone(&active),
1183            peak: Arc::clone(&peak),
1184        }));
1185
1186        // 5 independent nodes, max_concurrency = 2.
1187        let def = json!({
1188            "nodes": [
1189                { "id": "n1", "type": "peak-tracker" },
1190                { "id": "n2", "type": "peak-tracker" },
1191                { "id": "n3", "type": "peak-tracker" },
1192                { "id": "n4", "type": "peak-tracker" },
1193                { "id": "n5", "type": "peak-tracker" }
1194            ],
1195            "edges": []
1196        });
1197
1198        let dag = DagGraph::from_json(&def).unwrap();
1199        let runner = FlowRunner::new(dag, registry).with_max_concurrency(2);
1200        let result = runner.run(HashMap::new()).await.unwrap();
1201
1202        assert_eq!(result.completed_nodes.len(), 5);
1203        assert!(
1204            peak.load(Ordering::SeqCst) <= 2,
1205            "peak concurrency {} exceeded max of 2",
1206            peak.load(Ordering::SeqCst)
1207        );
1208    }
1209
1210    #[tokio::test]
1211    async fn max_concurrency_unlimited_by_default() {
1212        // With no max_concurrency, all 5 independent nodes should be able to
1213        // run concurrently (peak may be ≤ 5, just verify flow completes).
1214        let def = json!({
1215            "nodes": [
1216                { "id": "a", "type": "noop" },
1217                { "id": "b", "type": "noop" },
1218                { "id": "c", "type": "noop" }
1219            ],
1220            "edges": []
1221        });
1222        let dag = DagGraph::from_json(&def).unwrap();
1223        let result = FlowRunner::new(dag, NodeRegistry::with_defaults())
1224            .run(HashMap::new())
1225            .await
1226            .unwrap();
1227        assert_eq!(result.completed_nodes.len(), 3);
1228    }
1229
1230    // ── start / end nodes ──────────────────────────────────────────────────
1231
1232    #[tokio::test]
1233    async fn start_node_resolves_variables_and_end_node_gathers_output() {
1234        let def = json!({
1235            "nodes": [
1236                {
1237                    "id": "start",
1238                    "type": "start",
1239                    "data": {
1240                        "inputs": [
1241                            { "name": "greeting", "type": "string" },
1242                            { "name": "repeat",   "type": "number", "default": 1 }
1243                        ]
1244                    }
1245                },
1246                {
1247                    "id": "end",
1248                    "type": "end",
1249                    "data": {
1250                        "outputs": {
1251                            "greeting": "/start/greeting",
1252                            "repeat":   "/start/repeat"
1253                        }
1254                    }
1255                }
1256            ],
1257            "edges": [{ "source": "start", "target": "end" }]
1258        });
1259        let dag = DagGraph::from_json(&def).unwrap();
1260        let mut vars = HashMap::new();
1261        vars.insert("greeting".to_string(), json!("hello"));
1262        let result = FlowRunner::new(dag, NodeRegistry::with_defaults())
1263            .run(vars)
1264            .await
1265            .unwrap();
1266
1267        // start node resolves greeting and applies default for repeat.
1268        assert_eq!(result.outputs["start"]["greeting"], json!("hello"));
1269        assert_eq!(result.outputs["start"]["repeat"], json!(1));
1270
1271        // end node gathers via JSON pointer.
1272        assert_eq!(result.outputs["end"]["greeting"], json!("hello"));
1273        assert_eq!(result.outputs["end"]["repeat"], json!(1));
1274    }
1275
1276    // ── assign node — variable scope mutation ──────────────────────────────
1277
1278    #[tokio::test]
1279    async fn assign_node_makes_value_visible_to_downstream_nodes() {
1280        // "init" assigns greeting; "read" is a code node that reads it from variables.
1281        let def = json!({
1282            "nodes": [
1283                {
1284                    "id": "init",
1285                    "type": "assign",
1286                    "data": { "assigns": { "greeting": "hello from assign" } }
1287                },
1288                {
1289                    "id": "read",
1290                    "type": "code",
1291                    "data": { "language": "rhai", "code": "variables.greeting" }
1292                }
1293            ],
1294            "edges": [{ "source": "init", "target": "read" }]
1295        });
1296        let dag = DagGraph::from_json(&def).unwrap();
1297        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
1298        let result = runner.run(HashMap::new()).await.unwrap();
1299
1300        assert_eq!(result.outputs["read"]["output"], json!("hello from assign"));
1301    }
1302
1303    #[tokio::test]
1304    async fn assign_node_overwrites_existing_variable() {
1305        let def = json!({
1306            "nodes": [
1307                {
1308                    "id": "overwrite",
1309                    "type": "assign",
1310                    "data": { "assigns": { "x": "new_value" } }
1311                },
1312                {
1313                    "id": "read",
1314                    "type": "code",
1315                    "data": { "language": "rhai", "code": "variables.x" }
1316                }
1317            ],
1318            "edges": [{ "source": "overwrite", "target": "read" }]
1319        });
1320        let dag = DagGraph::from_json(&def).unwrap();
1321        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
1322        let mut vars = HashMap::new();
1323        vars.insert("x".to_string(), json!("old_value"));
1324        let result = runner.run(vars).await.unwrap();
1325
1326        assert_eq!(result.outputs["read"]["output"], json!("new_value"));
1327    }
1328
1329    #[tokio::test]
1330    async fn assign_node_does_not_affect_parallel_siblings() {
1331        // "assign_a" and "noop_b" run in the same wave (no edges between them).
1332        // "read" runs after both — sees the assigned value.
1333        let def = json!({
1334            "nodes": [
1335                {
1336                    "id": "assign_a",
1337                    "type": "assign",
1338                    "data": { "assigns": { "flag": "set" } }
1339                },
1340                { "id": "noop_b", "type": "noop" },
1341                {
1342                    "id": "read",
1343                    "type": "code",
1344                    "data": { "language": "rhai", "code": "variables.flag" }
1345                }
1346            ],
1347            "edges": [
1348                { "source": "assign_a", "target": "read" },
1349                { "source": "noop_b",   "target": "read" }
1350            ]
1351        });
1352        let dag = DagGraph::from_json(&def).unwrap();
1353        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
1354        let result = runner.run(HashMap::new()).await.unwrap();
1355
1356        // "read" runs in wave 2; the assign happened in wave 1, so it's visible.
1357        assert_eq!(result.outputs["read"]["output"], json!("set"));
1358    }
1359}