1use std::collections::{HashMap, HashSet};
11use std::sync::{Arc, RwLock};
12use std::time::Duration;
13
14use tokio::sync::Semaphore;
15
16use serde_json::Value;
17use tokio::sync::watch;
18use tokio::task::JoinSet;
19use tokio_util::sync::CancellationToken;
20use tracing::{debug, info, instrument, Instrument};
21use uuid::Uuid;
22
23use crate::error::{FlowError, Result};
24use crate::event::{EventEmitter, NoopEventEmitter};
25use crate::flow_store::FlowStore;
26use crate::graph::DagGraph;
27use crate::node::{ExecContext, Node, RetryPolicy};
28use crate::registry::NodeRegistry;
29use crate::result::FlowResult;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub(crate) enum FlowSignal {
34 Run,
36 Pause,
38}
39
40pub struct FlowRunner {
68 dag: DagGraph,
69 registry: Arc<NodeRegistry>,
70 emitter: Arc<dyn EventEmitter>,
71 flow_store: Option<Arc<dyn FlowStore>>,
72 max_concurrency: Option<usize>,
74}
75
76impl FlowRunner {
77 pub fn new(dag: DagGraph, registry: NodeRegistry) -> Self {
83 Self {
84 dag,
85 registry: Arc::new(registry),
86 emitter: Arc::new(NoopEventEmitter),
87 flow_store: None,
88 max_concurrency: None,
89 }
90 }
91
92 pub fn with_arc_registry(dag: DagGraph, registry: Arc<NodeRegistry>) -> Self {
97 Self {
98 dag,
99 registry,
100 emitter: Arc::new(NoopEventEmitter),
101 flow_store: None,
102 max_concurrency: None,
103 }
104 }
105
106 pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
111 self.emitter = emitter;
112 self
113 }
114
115 pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
121 self.flow_store = Some(store);
122 self
123 }
124
125 pub fn with_max_concurrency(mut self, n: usize) -> Self {
135 self.max_concurrency = Some(n);
136 self
137 }
138
139 #[instrument(skip(self, variables), fields(execution_id))]
141 pub async fn run(&self, variables: HashMap<String, Value>) -> Result<FlowResult> {
142 let execution_id = Uuid::new_v4();
143 tracing::Span::current().record("execution_id", execution_id.to_string());
144 let (_tx, rx) = watch::channel(FlowSignal::Run);
146 let cancel = CancellationToken::new();
147 self.run_seeded(
148 execution_id,
149 variables,
150 rx,
151 cancel,
152 HashMap::new(),
153 HashSet::new(),
154 HashSet::new(),
155 )
156 .await
157 }
158
159 pub async fn resume_from(
182 &self,
183 prior: &FlowResult,
184 variables: HashMap<String, Value>,
185 ) -> Result<FlowResult> {
186 let execution_id = Uuid::new_v4();
187 let (_tx, rx) = watch::channel(FlowSignal::Run);
188 let cancel = CancellationToken::new();
189 self.run_seeded(
190 execution_id,
191 variables,
192 rx,
193 cancel,
194 prior.outputs.clone(),
195 prior.completed_nodes.clone(),
196 prior.skipped_nodes.clone(),
197 )
198 .await
199 }
200
201 pub(crate) async fn run_controlled(
206 &self,
207 execution_id: Uuid,
208 variables: HashMap<String, Value>,
209 signal_rx: watch::Receiver<FlowSignal>,
210 cancel: CancellationToken,
211 ) -> Result<FlowResult> {
212 self.run_seeded(
213 execution_id,
214 variables,
215 signal_rx,
216 cancel,
217 HashMap::new(),
218 HashSet::new(),
219 HashSet::new(),
220 )
221 .await
222 }
223
224 #[allow(clippy::too_many_arguments)]
231 async fn run_seeded(
232 &self,
233 execution_id: Uuid,
234 variables: HashMap<String, Value>,
235 signal_rx: watch::Receiver<FlowSignal>,
236 cancel: CancellationToken,
237 initial_outputs: HashMap<String, Value>,
238 initial_completed: HashSet<String>,
239 initial_skipped: HashSet<String>,
240 ) -> Result<FlowResult> {
241 info!(%execution_id, "flow execution started");
242 self.emitter.on_flow_started(execution_id).await;
243
244 let outcome = self
245 .execute_waves(
246 execution_id,
247 variables,
248 signal_rx,
249 cancel,
250 initial_outputs,
251 initial_completed,
252 initial_skipped,
253 )
254 .await;
255
256 match &outcome {
257 Ok(result) => {
258 info!(%execution_id, "flow execution complete");
259 self.emitter.on_flow_completed(execution_id, result).await;
260 }
261 Err(FlowError::Terminated) => {
262 info!(%execution_id, "flow execution terminated");
263 self.emitter.on_flow_terminated(execution_id).await;
264 }
265 Err(e) => {
266 tracing::warn!(%execution_id, error = %e, "flow execution failed");
267 self.emitter
268 .on_flow_failed(execution_id, &e.to_string())
269 .await;
270 }
271 }
272
273 outcome
274 }
275
276 #[allow(clippy::too_many_arguments)]
278 async fn execute_waves(
279 &self,
280 execution_id: Uuid,
281 variables: HashMap<String, Value>,
282 mut signal_rx: watch::Receiver<FlowSignal>,
283 cancel: CancellationToken,
284 initial_outputs: HashMap<String, Value>,
285 initial_completed: HashSet<String>,
286 initial_skipped: HashSet<String>,
287 ) -> Result<FlowResult> {
288 let mut variables = variables;
291 let context = Arc::new(RwLock::new(HashMap::new()));
293 let mut outputs = initial_outputs;
294 let mut completed = initial_completed;
295 let mut skipped = initial_skipped;
297 let mut remaining: Vec<String> = self
299 .dag
300 .nodes_in_order()
301 .map(|n| n.id.clone())
302 .filter(|id| !completed.contains(id))
303 .collect();
304
305 while !remaining.is_empty() {
306 loop {
308 if cancel.is_cancelled() {
309 return Err(FlowError::Terminated);
310 }
311 let signal = *signal_rx.borrow();
314 match signal {
315 FlowSignal::Run => break,
316 FlowSignal::Pause => {
317 tokio::select! {
319 _ = signal_rx.changed() => continue,
320 _ = cancel.cancelled() => return Err(FlowError::Terminated),
321 }
322 }
323 }
324 }
325
326 let (ready, not_ready): (Vec<_>, Vec<_>) = remaining.into_iter().partition(|id| {
328 self.dag
329 .dependencies_of(id)
330 .iter()
331 .all(|dep| completed.contains(dep))
332 });
333
334 if ready.is_empty() {
335 return Err(FlowError::Internal(
336 "execution stalled: no nodes are ready but not all nodes are done".into(),
337 ));
338 }
339
340 remaining = not_ready;
341
342 let assign_node_ids: Vec<String> = ready
346 .iter()
347 .filter(|id| {
348 self.dag
349 .nodes
350 .get(*id)
351 .map(|n| n.write_to_variables)
352 .unwrap_or(false)
353 })
354 .cloned()
355 .collect();
356
357 let semaphore = self.max_concurrency.map(|n| Arc::new(Semaphore::new(n)));
359
360 let mut join_set: JoinSet<(String, Result<Value>)> = JoinSet::new();
362
363 for node_id in ready {
364 let node_def = self.dag.nodes[&node_id].clone();
365
366 if let Some(ref cond) = node_def.run_if {
368 if !cond.evaluate(&outputs, &skipped) {
369 debug!(%node_id, "node skipped (run_if condition false)");
370 self.emitter.on_node_skipped(execution_id, &node_id).await;
371 outputs.insert(node_id.clone(), Value::Null);
372 skipped.insert(node_id.clone());
373 completed.insert(node_id);
374 continue;
375 }
376 }
377
378 let node = self.registry.get(&node_def.node_type)?;
379
380 let inputs: HashMap<String, Value> = self
381 .dag
382 .dependencies_of(&node_id)
383 .iter()
384 .filter_map(|dep| outputs.get(dep).map(|v| (dep.clone(), v.clone())))
385 .collect();
386
387 let ctx = ExecContext {
388 data: node_def.data.clone(),
389 inputs,
390 variables: variables.clone(),
391 context: Arc::clone(&context),
392 registry: Arc::clone(&self.registry),
393 flow_store: self.flow_store.clone(),
394 };
395
396 let retry = node_def.retry.clone();
397 let timeout_ms = node_def.timeout_ms;
398 let continue_on_error = node_def.continue_on_error;
399 let emitter = Arc::clone(&self.emitter);
400 let sem = semaphore.clone();
401
402 debug!(
403 %node_id,
404 node_type = %node_def.node_type,
405 retry = ?retry.as_ref().map(|r| r.max_attempts),
406 timeout_ms,
407 continue_on_error,
408 "executing node"
409 );
410
411 let span = tracing::info_span!(
413 "node.execute",
414 node_id = node_id.as_str(),
415 node_type = node_def.node_type.as_str(),
416 %execution_id,
417 );
418
419 join_set.spawn(
420 async move {
421 let _permit = if let Some(ref s) = sem {
425 Some(Arc::clone(s).acquire_owned().await.ok())
426 } else {
427 None
428 };
429
430 emitter
431 .on_node_started(execution_id, &node_id, &node_def.node_type)
432 .await;
433
434 let result: Result<Value> =
435 execute_with_policy(node, ctx, retry, timeout_ms)
436 .await
437 .map_err(|e| FlowError::NodeFailed {
438 node_id: node_id.clone(),
439 execution_id,
440 reason: e.to_string(),
441 });
442
443 let result: Result<Value> = if continue_on_error {
446 result
447 .or_else(|e| Ok(serde_json::json!({ "__error__": e.to_string() })))
448 } else {
449 result
450 };
451
452 match &result {
453 Ok(v) => {
454 emitter.on_node_completed(execution_id, &node_id, v).await;
455 }
456 Err(e) => {
457 emitter
458 .on_node_failed(execution_id, &node_id, &e.to_string())
459 .await;
460 }
461 }
462
463 (node_id, result)
464 }
465 .instrument(span),
466 );
467 }
468
469 loop {
471 tokio::select! {
472 _ = cancel.cancelled() => {
474 return Err(FlowError::Terminated);
476 }
477 maybe = join_set.join_next() => {
478 match maybe {
479 None => break, Some(Ok((node_id, Ok(value)))) => {
481 debug!(%node_id, "node completed");
482 outputs.insert(node_id.clone(), value);
483 completed.insert(node_id);
484 }
485 Some(Ok((_, Err(e)))) => return Err(e),
486 Some(Err(join_err)) if join_err.is_cancelled() => {
487 return Err(FlowError::Terminated);
488 }
489 Some(Err(e)) => return Err(FlowError::Internal(e.to_string())),
490 }
491 }
492 }
493 }
494
495 for node_id in &assign_node_ids {
498 if let Some(Value::Object(obj)) = outputs.get(node_id) {
499 if !obj.contains_key("__error__") {
500 for (k, v) in obj {
501 variables.insert(k.clone(), v.clone());
502 }
503 }
504 }
505 }
506 }
507
508 Ok(FlowResult {
509 execution_id,
510 outputs,
511 completed_nodes: completed,
512 skipped_nodes: skipped,
513 })
514 }
515}
516
517async fn execute_with_policy(
525 node: Arc<dyn Node>,
526 ctx: ExecContext,
527 retry: Option<RetryPolicy>,
528 timeout_ms: Option<u64>,
529) -> Result<Value> {
530 let max_attempts = retry.as_ref().map(|r| r.max_attempts.max(1)).unwrap_or(1);
531 let backoff_ms = retry.as_ref().map(|r| r.backoff_ms).unwrap_or(0);
532
533 let mut last_err = FlowError::Internal("no attempts made".into());
534
535 for attempt in 0..max_attempts {
536 if attempt > 0 && backoff_ms > 0 {
537 let multiplier = 1u64 << (attempt - 1).min(6);
539 let delay = backoff_ms.saturating_mul(multiplier);
540 tokio::time::sleep(Duration::from_millis(delay)).await;
541 }
542
543 let fut = node.execute(ctx.clone());
544
545 let result = if let Some(ms) = timeout_ms {
546 tokio::time::timeout(Duration::from_millis(ms), fut)
547 .await
548 .unwrap_or_else(|_| Err(FlowError::Internal(format!("timed out after {ms}ms"))))
549 } else {
550 fut.await
551 };
552
553 match result {
554 Ok(v) => return Ok(v),
555 Err(e) => last_err = e,
556 }
557 }
558
559 Err(last_err)
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use crate::graph::DagGraph;
566 use crate::registry::NodeRegistry;
567 use serde_json::json;
568
569 #[tokio::test]
570 async fn runs_linear_flow() {
571 let def = json!({
572 "nodes": [
573 { "id": "a", "type": "noop" },
574 { "id": "b", "type": "noop" },
575 { "id": "c", "type": "noop" }
576 ],
577 "edges": [
578 { "source": "a", "target": "b" },
579 { "source": "b", "target": "c" }
580 ]
581 });
582 let dag = DagGraph::from_json(&def).unwrap();
583 let registry = NodeRegistry::with_defaults();
584 let runner = FlowRunner::new(dag, registry);
585 let result = runner.run(HashMap::new()).await.unwrap();
586
587 assert!(result.outputs.contains_key("a"));
588 assert!(result.outputs.contains_key("b"));
589 assert!(result.outputs.contains_key("c"));
590 }
591
592 #[tokio::test]
593 async fn runs_parallel_fan_out() {
594 let def = json!({
595 "nodes": [
596 { "id": "start", "type": "noop" },
597 { "id": "b", "type": "noop" },
598 { "id": "c", "type": "noop" },
599 { "id": "end", "type": "noop" }
600 ],
601 "edges": [
602 { "source": "start", "target": "b" },
603 { "source": "start", "target": "c" },
604 { "source": "b", "target": "end" },
605 { "source": "c", "target": "end" }
606 ]
607 });
608 let dag = DagGraph::from_json(&def).unwrap();
609 let registry = NodeRegistry::with_defaults();
610 let runner = FlowRunner::new(dag, registry);
611 let result = runner.run(HashMap::new()).await.unwrap();
612 assert_eq!(result.outputs.len(), 4);
613 }
614
615 #[tokio::test]
616 async fn variables_available_in_context() {
617 let def = json!({ "nodes": [{ "id": "only", "type": "noop" }], "edges": [] });
618 let dag = DagGraph::from_json(&def).unwrap();
619 let registry = NodeRegistry::with_defaults();
620 let runner = FlowRunner::new(dag, registry);
621
622 let vars = HashMap::from([("env".into(), json!("production"))]);
623 let result = runner.run(vars).await.unwrap();
624 assert!(result.outputs.contains_key("only"));
625 }
626
627 #[tokio::test]
628 async fn run_if_skips_node_when_if_else_falls_to_else() {
629 let def = json!({
632 "nodes": [
633 { "id": "data", "type": "noop" },
634 {
635 "id": "route", "type": "if-else",
636 "data": { "cases": [{ "id": "hit", "conditions": [{ "from": "data", "path": "", "op": "eq", "value": 999 }] }] }
637 },
638 {
639 "id": "process", "type": "noop",
640 "data": { "run_if": { "from": "route", "path": "branch", "op": "eq", "value": "hit" } }
641 }
642 ],
643 "edges": [
644 { "source": "data", "target": "route" },
645 { "source": "route", "target": "process" }
646 ]
647 });
648 let dag = DagGraph::from_json(&def).unwrap();
649 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
650 let result = runner.run(HashMap::new()).await.unwrap();
651
652 assert_eq!(result.outputs["process"], json!(null));
653 }
654
655 #[tokio::test]
656 async fn run_if_executes_node_when_if_else_matches() {
657 let def = json!({
659 "nodes": [
660 { "id": "src", "type": "noop" },
661 {
662 "id": "gate", "type": "if-else",
663 "data": { "cases": [{ "id": "hit", "conditions": [{ "from": "src", "path": "", "op": "eq", "value": {} }] }] }
664 },
665 {
666 "id": "sink", "type": "noop",
667 "data": { "run_if": { "from": "gate", "path": "branch", "op": "eq", "value": "hit" } }
668 }
669 ],
670 "edges": [
671 { "source": "src", "target": "gate" },
672 { "source": "gate", "target": "sink" }
673 ]
674 });
675 let dag = DagGraph::from_json(&def).unwrap();
676 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
677 let result = runner.run(HashMap::new()).await.unwrap();
678
679 assert!(result.outputs["sink"].is_object());
680 assert_ne!(result.outputs["sink"], json!(null));
681 }
682
683 #[tokio::test]
684 async fn skip_propagates_through_chain() {
685 let def = json!({
687 "nodes": [
688 { "id": "a", "type": "noop" },
689 {
690 "id": "b", "type": "noop",
691 "data": { "run_if": { "from": "a", "path": "nonexistent_field", "op": "eq", "value": true } }
692 },
693 {
694 "id": "c", "type": "noop",
695 "data": { "run_if": { "from": "b", "path": "x", "op": "eq", "value": 1 } }
696 }
697 ],
698 "edges": [
699 { "source": "a", "target": "b" },
700 { "source": "b", "target": "c" }
701 ]
702 });
703 let dag = DagGraph::from_json(&def).unwrap();
704 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
705 let result = runner.run(HashMap::new()).await.unwrap();
706
707 assert_eq!(result.outputs["b"], json!(null));
708 assert_eq!(result.outputs["c"], json!(null));
709 }
710
711 #[tokio::test]
712 async fn if_else_with_variable_aggregator_fan_in() {
713 let def = json!({
715 "nodes": [
716 { "id": "src", "type": "noop" },
717 {
718 "id": "route", "type": "if-else",
719 "data": { "cases": [{ "id": "ok", "conditions": [{ "from": "src", "path": "", "op": "eq", "value": {} }] }] }
720 },
721 {
722 "id": "path_ok", "type": "noop",
723 "data": { "run_if": { "from": "route", "path": "branch", "op": "eq", "value": "ok" } }
724 },
725 {
726 "id": "path_err", "type": "noop",
727 "data": { "run_if": { "from": "route", "path": "branch", "op": "eq", "value": "else" } }
728 },
729 {
730 "id": "merge", "type": "variable-aggregator",
731 "data": { "inputs": ["path_ok", "path_err"] }
732 }
733 ],
734 "edges": [
735 { "source": "src", "target": "route" },
736 { "source": "route", "target": "path_ok" },
737 { "source": "route", "target": "path_err" },
738 { "source": "path_ok", "target": "merge" },
739 { "source": "path_err", "target": "merge" }
740 ]
741 });
742 let dag = DagGraph::from_json(&def).unwrap();
743 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
744 let result = runner.run(HashMap::new()).await.unwrap();
745
746 assert!(!result.outputs["merge"]["output"].is_null());
748 assert_eq!(result.outputs["path_err"], json!(null));
749 }
750
751 #[tokio::test]
754 async fn completed_nodes_tracks_all_executed_nodes() {
755 let def = json!({
756 "nodes": [
757 { "id": "a", "type": "noop" },
758 { "id": "b", "type": "noop" }
759 ],
760 "edges": [{ "source": "a", "target": "b" }]
761 });
762 let dag = DagGraph::from_json(&def).unwrap();
763 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
764 let result = runner.run(HashMap::new()).await.unwrap();
765
766 assert!(result.completed_nodes.contains("a"));
767 assert!(result.completed_nodes.contains("b"));
768 assert!(result.skipped_nodes.is_empty());
769 }
770
771 #[tokio::test]
772 async fn skipped_nodes_tracks_run_if_skipped_nodes() {
773 let def = json!({
775 "nodes": [
776 { "id": "a", "type": "noop" },
777 {
778 "id": "b", "type": "noop",
779 "data": { "run_if": { "from": "a", "path": "nonexistent", "op": "eq", "value": true } }
780 }
781 ],
782 "edges": [{ "source": "a", "target": "b" }]
783 });
784 let dag = DagGraph::from_json(&def).unwrap();
785 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
786 let result = runner.run(HashMap::new()).await.unwrap();
787
788 assert!(result.completed_nodes.contains("a"));
789 assert!(result.completed_nodes.contains("b"));
790 assert!(result.skipped_nodes.contains("b"));
791 assert!(!result.skipped_nodes.contains("a"));
792 }
793
794 #[tokio::test]
797 async fn retry_succeeds_after_transient_failures() {
798 use crate::node::{ExecContext, Node};
799 use async_trait::async_trait;
800 use std::sync::atomic::{AtomicU32, Ordering};
801
802 struct FlakyNode {
804 call_count: Arc<AtomicU32>,
805 }
806
807 #[async_trait]
808 impl Node for FlakyNode {
809 fn node_type(&self) -> &str {
810 "flaky"
811 }
812
813 async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
814 let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
815 if n < 3 {
816 Err(FlowError::Internal(format!("transient failure #{n}")))
817 } else {
818 Ok(json!({ "ok": true }))
819 }
820 }
821 }
822
823 let call_count = Arc::new(AtomicU32::new(0));
824 let mut registry = NodeRegistry::with_defaults();
825 registry.register(Arc::new(FlakyNode {
826 call_count: Arc::clone(&call_count),
827 }));
828
829 let def = json!({
830 "nodes": [{
831 "id": "step",
832 "type": "flaky",
833 "data": { "retry": { "max_attempts": 3, "backoff_ms": 0 } }
834 }],
835 "edges": []
836 });
837 let dag = DagGraph::from_json(&def).unwrap();
838 let runner = FlowRunner::new(dag, registry);
839 let result = runner.run(HashMap::new()).await.unwrap();
840
841 assert_eq!(result.outputs["step"]["ok"], json!(true));
842 assert_eq!(call_count.load(Ordering::SeqCst), 3);
843 }
844
845 #[tokio::test]
846 async fn retry_exhausted_returns_last_error() {
847 use crate::node::{ExecContext, Node};
848 use async_trait::async_trait;
849
850 struct AlwaysFailNode;
852
853 #[async_trait]
854 impl Node for AlwaysFailNode {
855 fn node_type(&self) -> &str {
856 "always-fail"
857 }
858
859 async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
860 Err(FlowError::Internal("permanent failure".into()))
861 }
862 }
863
864 let mut registry = NodeRegistry::with_defaults();
865 registry.register(Arc::new(AlwaysFailNode));
866
867 let def = json!({
868 "nodes": [{
869 "id": "step",
870 "type": "always-fail",
871 "data": { "retry": { "max_attempts": 2, "backoff_ms": 0 } }
872 }],
873 "edges": []
874 });
875 let dag = DagGraph::from_json(&def).unwrap();
876 let runner = FlowRunner::new(dag, registry);
877 let err = runner.run(HashMap::new()).await.unwrap_err();
878
879 assert!(matches!(err, FlowError::NodeFailed { .. }));
880 let msg = err.to_string();
881 assert!(msg.contains("permanent failure"));
882 }
883
884 #[tokio::test]
887 async fn timeout_kills_slow_node() {
888 use crate::node::{ExecContext, Node};
889 use async_trait::async_trait;
890
891 struct SlowNode;
892
893 #[async_trait]
894 impl Node for SlowNode {
895 fn node_type(&self) -> &str {
896 "slow-timeout"
897 }
898
899 async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
900 tokio::time::sleep(Duration::from_millis(500)).await;
901 Ok(json!({}))
902 }
903 }
904
905 let mut registry = NodeRegistry::with_defaults();
906 registry.register(Arc::new(SlowNode));
907
908 let def = json!({
910 "nodes": [{
911 "id": "step",
912 "type": "slow-timeout",
913 "data": { "timeout_ms": 50 }
914 }],
915 "edges": []
916 });
917 let dag = DagGraph::from_json(&def).unwrap();
918 let runner = FlowRunner::new(dag, registry);
919 let err = runner.run(HashMap::new()).await.unwrap_err();
920
921 assert!(matches!(err, FlowError::NodeFailed { .. }));
922 assert!(err.to_string().contains("timed out"));
923 }
924
925 #[tokio::test]
926 async fn timeout_does_not_affect_fast_node() {
927 let def = json!({
929 "nodes": [{
930 "id": "step",
931 "type": "noop",
932 "data": { "timeout_ms": 200 }
933 }],
934 "edges": []
935 });
936 let dag = DagGraph::from_json(&def).unwrap();
937 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
938 let result = runner.run(HashMap::new()).await.unwrap();
939 assert!(result.outputs.contains_key("step"));
940 }
941
942 #[tokio::test]
945 async fn resume_from_skips_already_completed_nodes() {
946 use crate::node::{ExecContext, Node};
947 use async_trait::async_trait;
948 use std::sync::atomic::{AtomicU32, Ordering};
949
950 struct CountingNode {
952 call_count: Arc<AtomicU32>,
953 }
954
955 #[async_trait]
956 impl Node for CountingNode {
957 fn node_type(&self) -> &str {
958 "counting"
959 }
960
961 async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
962 self.call_count.fetch_add(1, Ordering::SeqCst);
963 Ok(json!({ "counted": true }))
964 }
965 }
966
967 let count_a = Arc::new(AtomicU32::new(0));
968 let count_b = Arc::new(AtomicU32::new(0));
969 let mut registry = NodeRegistry::with_defaults();
970 registry.register(Arc::new(CountingNode {
971 call_count: Arc::clone(&count_a),
972 }));
973
974 let def = json!({
976 "nodes": [
977 { "id": "a", "type": "counting" },
978 { "id": "b", "type": "noop" }
979 ],
980 "edges": [{ "source": "a", "target": "b" }]
981 });
982
983 let dag = DagGraph::from_json(&def).unwrap();
984 let _ = count_b; let runner = FlowRunner::new(dag, registry);
986
987 let first = runner.run(HashMap::new()).await.unwrap();
989 assert_eq!(count_a.load(Ordering::SeqCst), 1);
990
991 let resumed = runner.resume_from(&first, HashMap::new()).await.unwrap();
993 assert_eq!(count_a.load(Ordering::SeqCst), 1); assert!(resumed.outputs.contains_key("a"));
995 assert!(resumed.outputs.contains_key("b"));
996 }
997
998 #[tokio::test]
999 async fn resume_from_runs_only_pending_nodes() {
1000 use crate::node::{ExecContext, Node};
1003 use async_trait::async_trait;
1004 use std::sync::atomic::{AtomicU32, Ordering};
1005
1006 struct CountNode(Arc<AtomicU32>);
1007
1008 #[async_trait]
1009 impl Node for CountNode {
1010 fn node_type(&self) -> &str {
1011 "count-b"
1012 }
1013 async fn execute(&self, _ctx: ExecContext) -> Result<Value> {
1014 self.0.fetch_add(1, Ordering::SeqCst);
1015 Ok(json!({ "ran": true }))
1016 }
1017 }
1018
1019 let count_b = Arc::new(AtomicU32::new(0));
1020 let mut registry = NodeRegistry::with_defaults();
1021 registry.register(Arc::new(CountNode(Arc::clone(&count_b))));
1022
1023 let def = json!({
1024 "nodes": [
1025 { "id": "a", "type": "noop" },
1026 { "id": "b", "type": "count-b" }
1027 ],
1028 "edges": [{ "source": "a", "target": "b" }]
1029 });
1030 let dag = DagGraph::from_json(&def).unwrap();
1031 let runner = FlowRunner::new(dag, registry);
1032
1033 let partial = FlowResult {
1035 execution_id: uuid::Uuid::new_v4(),
1036 outputs: HashMap::from([("a".into(), json!({}))]),
1037 completed_nodes: HashSet::from(["a".into()]),
1038 skipped_nodes: HashSet::new(),
1039 };
1040
1041 let result = runner.resume_from(&partial, HashMap::new()).await.unwrap();
1042 assert_eq!(count_b.load(Ordering::SeqCst), 1);
1043 assert!(result.outputs["b"]["ran"].as_bool().unwrap());
1044
1045 let full = runner.run(HashMap::new()).await.unwrap();
1047 count_b.store(0, Ordering::SeqCst);
1048 let _ = runner.resume_from(&full, HashMap::new()).await.unwrap();
1049 assert_eq!(count_b.load(Ordering::SeqCst), 0);
1050
1051 let _ = partial; }
1053
1054 #[tokio::test]
1057 async fn continue_on_error_keeps_flow_running_after_node_failure() {
1058 use crate::node::{ExecContext, Node};
1059 use async_trait::async_trait;
1060
1061 struct FailNode;
1062
1063 #[async_trait]
1064 impl Node for FailNode {
1065 fn node_type(&self) -> &str {
1066 "always-fail-coe"
1067 }
1068 async fn execute(&self, _: ExecContext) -> Result<Value> {
1069 Err(FlowError::Internal("boom".into()))
1070 }
1071 }
1072
1073 let mut registry = NodeRegistry::with_defaults();
1074 registry.register(Arc::new(FailNode));
1075
1076 let def = json!({
1077 "nodes": [
1078 {
1079 "id": "fail",
1080 "type": "always-fail-coe",
1081 "data": { "continue_on_error": true }
1082 },
1083 { "id": "after", "type": "noop" }
1084 ],
1085 "edges": [{ "source": "fail", "target": "after" }]
1086 });
1087
1088 let dag = DagGraph::from_json(&def).unwrap();
1089 let result = FlowRunner::new(dag, registry)
1090 .run(HashMap::new())
1091 .await
1092 .unwrap();
1093
1094 assert!(result.outputs["fail"]["__error__"].is_string());
1096 assert!(result.completed_nodes.contains("after"));
1098 }
1099
1100 #[tokio::test]
1101 async fn continue_on_error_false_halts_flow_on_failure() {
1102 use crate::node::{ExecContext, Node};
1103 use async_trait::async_trait;
1104
1105 struct FailNode2;
1106
1107 #[async_trait]
1108 impl Node for FailNode2 {
1109 fn node_type(&self) -> &str {
1110 "always-fail-halt"
1111 }
1112 async fn execute(&self, _: ExecContext) -> Result<Value> {
1113 Err(FlowError::Internal("halt".into()))
1114 }
1115 }
1116
1117 let mut registry = NodeRegistry::with_defaults();
1118 registry.register(Arc::new(FailNode2));
1119
1120 let def = json!({
1121 "nodes": [
1122 { "id": "fail", "type": "always-fail-halt" },
1123 { "id": "after", "type": "noop" }
1124 ],
1125 "edges": [{ "source": "fail", "target": "after" }]
1126 });
1127
1128 let dag = DagGraph::from_json(&def).unwrap();
1129 let err = FlowRunner::new(dag, registry)
1130 .run(HashMap::new())
1131 .await
1132 .unwrap_err();
1133
1134 assert!(matches!(err, FlowError::NodeFailed { .. }));
1135 }
1136
1137 #[tokio::test]
1140 async fn max_concurrency_limits_parallel_execution() {
1141 use crate::node::{ExecContext, Node};
1142 use async_trait::async_trait;
1143 use std::sync::atomic::{AtomicU32, Ordering};
1144
1145 let active = Arc::new(AtomicU32::new(0));
1147 let peak = Arc::new(AtomicU32::new(0));
1148
1149 struct PeakNode {
1150 active: Arc<AtomicU32>,
1151 peak: Arc<AtomicU32>,
1152 }
1153
1154 #[async_trait]
1155 impl Node for PeakNode {
1156 fn node_type(&self) -> &str {
1157 "peak-tracker"
1158 }
1159 async fn execute(&self, _: ExecContext) -> Result<Value> {
1160 let current = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1161 let mut prev = self.peak.load(Ordering::SeqCst);
1163 while current > prev {
1164 match self.peak.compare_exchange_weak(
1165 prev,
1166 current,
1167 Ordering::SeqCst,
1168 Ordering::SeqCst,
1169 ) {
1170 Ok(_) => break,
1171 Err(actual) => prev = actual,
1172 }
1173 }
1174 tokio::time::sleep(Duration::from_millis(20)).await;
1175 self.active.fetch_sub(1, Ordering::SeqCst);
1176 Ok(json!({}))
1177 }
1178 }
1179
1180 let mut registry = NodeRegistry::with_defaults();
1181 registry.register(Arc::new(PeakNode {
1182 active: Arc::clone(&active),
1183 peak: Arc::clone(&peak),
1184 }));
1185
1186 let def = json!({
1188 "nodes": [
1189 { "id": "n1", "type": "peak-tracker" },
1190 { "id": "n2", "type": "peak-tracker" },
1191 { "id": "n3", "type": "peak-tracker" },
1192 { "id": "n4", "type": "peak-tracker" },
1193 { "id": "n5", "type": "peak-tracker" }
1194 ],
1195 "edges": []
1196 });
1197
1198 let dag = DagGraph::from_json(&def).unwrap();
1199 let runner = FlowRunner::new(dag, registry).with_max_concurrency(2);
1200 let result = runner.run(HashMap::new()).await.unwrap();
1201
1202 assert_eq!(result.completed_nodes.len(), 5);
1203 assert!(
1204 peak.load(Ordering::SeqCst) <= 2,
1205 "peak concurrency {} exceeded max of 2",
1206 peak.load(Ordering::SeqCst)
1207 );
1208 }
1209
1210 #[tokio::test]
1211 async fn max_concurrency_unlimited_by_default() {
1212 let def = json!({
1215 "nodes": [
1216 { "id": "a", "type": "noop" },
1217 { "id": "b", "type": "noop" },
1218 { "id": "c", "type": "noop" }
1219 ],
1220 "edges": []
1221 });
1222 let dag = DagGraph::from_json(&def).unwrap();
1223 let result = FlowRunner::new(dag, NodeRegistry::with_defaults())
1224 .run(HashMap::new())
1225 .await
1226 .unwrap();
1227 assert_eq!(result.completed_nodes.len(), 3);
1228 }
1229
1230 #[tokio::test]
1233 async fn start_node_resolves_variables_and_end_node_gathers_output() {
1234 let def = json!({
1235 "nodes": [
1236 {
1237 "id": "start",
1238 "type": "start",
1239 "data": {
1240 "inputs": [
1241 { "name": "greeting", "type": "string" },
1242 { "name": "repeat", "type": "number", "default": 1 }
1243 ]
1244 }
1245 },
1246 {
1247 "id": "end",
1248 "type": "end",
1249 "data": {
1250 "outputs": {
1251 "greeting": "/start/greeting",
1252 "repeat": "/start/repeat"
1253 }
1254 }
1255 }
1256 ],
1257 "edges": [{ "source": "start", "target": "end" }]
1258 });
1259 let dag = DagGraph::from_json(&def).unwrap();
1260 let mut vars = HashMap::new();
1261 vars.insert("greeting".to_string(), json!("hello"));
1262 let result = FlowRunner::new(dag, NodeRegistry::with_defaults())
1263 .run(vars)
1264 .await
1265 .unwrap();
1266
1267 assert_eq!(result.outputs["start"]["greeting"], json!("hello"));
1269 assert_eq!(result.outputs["start"]["repeat"], json!(1));
1270
1271 assert_eq!(result.outputs["end"]["greeting"], json!("hello"));
1273 assert_eq!(result.outputs["end"]["repeat"], json!(1));
1274 }
1275
1276 #[tokio::test]
1279 async fn assign_node_makes_value_visible_to_downstream_nodes() {
1280 let def = json!({
1282 "nodes": [
1283 {
1284 "id": "init",
1285 "type": "assign",
1286 "data": { "assigns": { "greeting": "hello from assign" } }
1287 },
1288 {
1289 "id": "read",
1290 "type": "code",
1291 "data": { "language": "rhai", "code": "variables.greeting" }
1292 }
1293 ],
1294 "edges": [{ "source": "init", "target": "read" }]
1295 });
1296 let dag = DagGraph::from_json(&def).unwrap();
1297 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
1298 let result = runner.run(HashMap::new()).await.unwrap();
1299
1300 assert_eq!(result.outputs["read"]["output"], json!("hello from assign"));
1301 }
1302
1303 #[tokio::test]
1304 async fn assign_node_overwrites_existing_variable() {
1305 let def = json!({
1306 "nodes": [
1307 {
1308 "id": "overwrite",
1309 "type": "assign",
1310 "data": { "assigns": { "x": "new_value" } }
1311 },
1312 {
1313 "id": "read",
1314 "type": "code",
1315 "data": { "language": "rhai", "code": "variables.x" }
1316 }
1317 ],
1318 "edges": [{ "source": "overwrite", "target": "read" }]
1319 });
1320 let dag = DagGraph::from_json(&def).unwrap();
1321 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
1322 let mut vars = HashMap::new();
1323 vars.insert("x".to_string(), json!("old_value"));
1324 let result = runner.run(vars).await.unwrap();
1325
1326 assert_eq!(result.outputs["read"]["output"], json!("new_value"));
1327 }
1328
1329 #[tokio::test]
1330 async fn assign_node_does_not_affect_parallel_siblings() {
1331 let def = json!({
1334 "nodes": [
1335 {
1336 "id": "assign_a",
1337 "type": "assign",
1338 "data": { "assigns": { "flag": "set" } }
1339 },
1340 { "id": "noop_b", "type": "noop" },
1341 {
1342 "id": "read",
1343 "type": "code",
1344 "data": { "language": "rhai", "code": "variables.flag" }
1345 }
1346 ],
1347 "edges": [
1348 { "source": "assign_a", "target": "read" },
1349 { "source": "noop_b", "target": "read" }
1350 ]
1351 });
1352 let dag = DagGraph::from_json(&def).unwrap();
1353 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults());
1354 let result = runner.run(HashMap::new()).await.unwrap();
1355
1356 assert_eq!(result.outputs["read"]["output"], json!("set"));
1358 }
1359}