1use std::collections::HashMap;
30use std::sync::Arc;
31
32use serde_json::Value;
33use tokio::sync::{watch, RwLock};
34use tokio_util::sync::CancellationToken;
35use tracing::warn;
36use uuid::Uuid;
37
38use tokio::sync::broadcast;
39
40use crate::error::{FlowError, Result};
41use crate::event::{ChannelEmitter, EventEmitter, FlowEvent, MulticastEmitter, NoopEventEmitter};
42use crate::execution::{ExecutionHandle, ExecutionState};
43use crate::flow_store::FlowStore;
44use crate::graph::DagGraph;
45use crate::registry::NodeRegistry;
46use crate::runner::{FlowRunner, FlowSignal};
47use crate::store::ExecutionStore;
48use crate::validation::ValidationIssue;
49
50pub struct FlowEngine {
82 registry: Arc<NodeRegistry>,
83 executions: Arc<RwLock<HashMap<Uuid, ExecutionHandle>>>,
84 execution_store: Option<Arc<dyn ExecutionStore>>,
86 flow_store: Option<Arc<dyn FlowStore>>,
88 emitter: Arc<dyn EventEmitter>,
90 max_concurrency: Option<usize>,
92}
93
94impl FlowEngine {
95 pub fn new(registry: NodeRegistry) -> Self {
101 Self {
102 registry: Arc::new(registry),
103 executions: Arc::new(RwLock::new(HashMap::new())),
104 execution_store: None,
105 flow_store: None,
106 emitter: Arc::new(NoopEventEmitter),
107 max_concurrency: None,
108 }
109 }
110
111 pub fn with_execution_store(mut self, store: Arc<dyn ExecutionStore>) -> Self {
116 self.execution_store = Some(store);
117 self
118 }
119
120 pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
126 self.flow_store = Some(store);
127 self
128 }
129
130 pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
135 self.emitter = emitter;
136 self
137 }
138
139 pub fn with_max_concurrency(mut self, n: usize) -> Self {
144 self.max_concurrency = Some(n);
145 self
146 }
147
148 pub fn node_types(&self) -> Vec<String> {
155 self.registry.list_types()
156 }
157
158 pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue> {
188 let mut issues = Vec::new();
189
190 let dag = match DagGraph::from_json(definition) {
192 Ok(dag) => dag,
193 Err(e) => {
194 issues.push(ValidationIssue {
195 node_id: None,
196 message: e.to_string(),
197 });
198 return issues;
199 }
200 };
201
202 for node_def in dag.nodes_in_order() {
203 if self.registry.get(&node_def.node_type).is_err() {
205 issues.push(ValidationIssue {
206 node_id: Some(node_def.id.clone()),
207 message: format!("unknown node type '{}'", node_def.node_type),
208 });
209 }
210
211 if let Some(ref cond) = node_def.run_if {
213 if !dag.nodes.contains_key(&cond.from) {
214 issues.push(ValidationIssue {
215 node_id: Some(node_def.id.clone()),
216 message: format!("run_if references unknown node '{}'", cond.from),
217 });
218 }
219 }
220 }
221
222 issues
223 }
224
225 pub async fn start(
238 &self,
239 definition: &Value,
240 variables: HashMap<String, Value>,
241 ) -> Result<Uuid> {
242 self.start_inner(definition, variables, Arc::clone(&self.emitter))
243 .await
244 }
245
246 pub async fn start_streaming(
288 &self,
289 definition: &Value,
290 variables: HashMap<String, Value>,
291 ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
292 let (tx, rx) = broadcast::channel(256);
293 let channel_emitter = Arc::new(ChannelEmitter::new(tx));
294 let multicast: Arc<dyn EventEmitter> = Arc::new(MulticastEmitter {
295 a: Arc::clone(&self.emitter),
296 b: channel_emitter,
297 });
298 let id = self.start_inner(definition, variables, multicast).await?;
299 Ok((id, rx))
300 }
301
302 async fn start_inner(
305 &self,
306 definition: &Value,
307 variables: HashMap<String, Value>,
308 emitter: Arc<dyn EventEmitter>,
309 ) -> Result<Uuid> {
310 let dag = DagGraph::from_json(definition)?;
311 let mut runner = FlowRunner::new(dag, (*self.registry).clone()).with_event_emitter(emitter);
312 if let Some(ref fs) = self.flow_store {
313 runner = runner.with_flow_store(Arc::clone(fs));
314 }
315 if let Some(n) = self.max_concurrency {
316 runner = runner.with_max_concurrency(n);
317 }
318
319 let execution_id = Uuid::new_v4();
320 let cancel = CancellationToken::new();
321 let (signal_tx, signal_rx) = watch::channel(FlowSignal::Run);
322 let state = Arc::new(RwLock::new(ExecutionState::Running));
323
324 let handle = ExecutionHandle {
325 state: Arc::clone(&state),
326 signal_tx,
327 cancel: cancel.clone(),
328 };
329
330 self.executions.write().await.insert(execution_id, handle);
331
332 let state_for_task = Arc::clone(&state);
335 let execution_store = self.execution_store.clone();
336 tokio::spawn(async move {
337 match runner
338 .run_controlled(execution_id, variables, signal_rx, cancel)
339 .await
340 {
341 Ok(result) => {
342 if let Some(ref store) = execution_store {
344 if let Err(e) = store.save(&result).await {
345 warn!(%execution_id, error = %e, "failed to persist execution result");
346 }
347 }
348 *state_for_task.write().await = ExecutionState::Completed(result);
349 }
350 Err(FlowError::Terminated) => {
351 *state_for_task.write().await = ExecutionState::Terminated;
352 }
353 Err(e) => {
354 *state_for_task.write().await = ExecutionState::Failed(e.to_string());
355 }
356 }
357 });
358
359 Ok(execution_id)
360 }
361
362 pub async fn start_named(&self, name: &str, variables: HashMap<String, Value>) -> Result<Uuid> {
378 let store = self.flow_store.as_ref().ok_or_else(|| {
379 FlowError::Internal("no FlowStore configured; call with_flow_store first".into())
380 })?;
381
382 let definition = store
383 .load(name)
384 .await?
385 .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
386
387 self.start(&definition, variables).await
388 }
389
390 pub async fn pause(&self, id: Uuid) -> Result<()> {
400 let executions = self.executions.read().await;
401 let handle = executions
402 .get(&id)
403 .ok_or(FlowError::ExecutionNotFound(id))?;
404
405 let mut state = handle.state.write().await;
406 match *state {
407 ExecutionState::Running => {
408 handle.signal_tx.send(FlowSignal::Pause).ok();
409 *state = ExecutionState::Paused;
410 Ok(())
411 }
412 ref s => Err(FlowError::InvalidTransition {
413 action: "pause".into(),
414 from: s.as_str().into(),
415 }),
416 }
417 }
418
419 pub async fn resume(&self, id: Uuid) -> Result<()> {
426 let executions = self.executions.read().await;
427 let handle = executions
428 .get(&id)
429 .ok_or(FlowError::ExecutionNotFound(id))?;
430
431 let mut state = handle.state.write().await;
432 match *state {
433 ExecutionState::Paused => {
434 handle.signal_tx.send(FlowSignal::Run).ok();
435 *state = ExecutionState::Running;
436 Ok(())
437 }
438 ref s => Err(FlowError::InvalidTransition {
439 action: "resume".into(),
440 from: s.as_str().into(),
441 }),
442 }
443 }
444
445 pub async fn terminate(&self, id: Uuid) -> Result<()> {
458 let executions = self.executions.read().await;
459 let handle = executions
460 .get(&id)
461 .ok_or(FlowError::ExecutionNotFound(id))?;
462
463 let state = handle.state.read().await;
464 if state.is_terminal() {
465 return Err(FlowError::InvalidTransition {
466 action: "terminate".into(),
467 from: state.as_str().into(),
468 });
469 }
470 drop(state);
471
472 handle.cancel.cancel();
473 handle.signal_tx.send(FlowSignal::Run).ok();
475 Ok(())
476 }
477
478 pub async fn state(&self, id: Uuid) -> Result<ExecutionState> {
484 let executions = self.executions.read().await;
485 let handle = executions
486 .get(&id)
487 .ok_or(FlowError::ExecutionNotFound(id))?;
488 let snapshot = handle.state.read().await.clone();
490 Ok(snapshot)
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::node::{ExecContext, Node};
498 use async_trait::async_trait;
499 use serde_json::{json, Value};
500 use std::time::Duration;
501
502 struct SlowNode(Duration);
506
507 #[async_trait]
508 impl Node for SlowNode {
509 fn node_type(&self) -> &str {
510 "slow"
511 }
512
513 async fn execute(&self, _ctx: ExecContext) -> crate::error::Result<Value> {
514 tokio::time::sleep(self.0).await;
515 Ok(json!({}))
516 }
517 }
518
519 fn slow_engine(delay: Duration) -> FlowEngine {
520 let mut registry = NodeRegistry::with_defaults();
521 registry.register(Arc::new(SlowNode(delay)));
522 FlowEngine::new(registry)
523 }
524
525 fn simple_def() -> Value {
526 json!({
527 "nodes": [
528 { "id": "a", "type": "noop" },
529 { "id": "b", "type": "noop" }
530 ],
531 "edges": [{ "source": "a", "target": "b" }]
532 })
533 }
534
535 fn slow_def() -> Value {
536 json!({
537 "nodes": [
538 { "id": "a", "type": "slow" },
539 { "id": "b", "type": "slow" }
540 ],
541 "edges": [{ "source": "a", "target": "b" }]
542 })
543 }
544
545 #[test]
548 fn node_types_includes_builtins() {
549 let engine = FlowEngine::new(NodeRegistry::with_defaults());
550 let types = engine.node_types();
551 assert!(types.contains(&"noop".to_string()));
552 }
553
554 #[test]
555 fn node_types_includes_custom_nodes() {
556 let mut registry = NodeRegistry::with_defaults();
557 registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
558 let engine = FlowEngine::new(registry);
559
560 let types = engine.node_types();
561 assert!(types.contains(&"noop".to_string()));
562 assert!(types.contains(&"slow".to_string()));
563 }
564
565 #[test]
566 fn node_types_is_sorted() {
567 let engine = FlowEngine::new(NodeRegistry::with_defaults());
568 let types = engine.node_types();
569 let mut sorted = types.clone();
570 sorted.sort();
571 assert_eq!(types, sorted);
572 }
573
574 #[tokio::test]
577 async fn start_returns_execution_id() {
578 let engine = FlowEngine::new(NodeRegistry::with_defaults());
579 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
580 assert!(!id.is_nil());
582 }
583
584 #[tokio::test]
585 async fn start_rejects_invalid_definition() {
586 let engine = FlowEngine::new(NodeRegistry::with_defaults());
587 let bad = json!({
588 "nodes": [{ "id": "a", "type": "noop" }],
589 "edges": [{ "source": "ghost", "target": "a" }]
590 });
591 assert!(matches!(
592 engine.start(&bad, HashMap::new()).await,
593 Err(FlowError::UnknownNode(_))
594 ));
595 }
596
597 #[tokio::test]
598 async fn completed_flow_has_outputs() {
599 let engine = FlowEngine::new(NodeRegistry::with_defaults());
600 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
601
602 tokio::time::sleep(Duration::from_millis(50)).await;
604
605 let state = engine.state(id).await.unwrap();
606 if let ExecutionState::Completed(result) = state {
607 assert!(result.outputs.contains_key("a"));
608 assert!(result.outputs.contains_key("b"));
609 } else {
610 panic!("expected Completed, got {}", state.as_str());
611 }
612 }
613
614 #[tokio::test]
617 async fn state_returns_not_found_for_unknown_id() {
618 let engine = FlowEngine::new(NodeRegistry::with_defaults());
619 let err = engine.state(Uuid::new_v4()).await.unwrap_err();
620 assert!(matches!(err, FlowError::ExecutionNotFound(_)));
621 }
622
623 #[tokio::test]
626 async fn pause_transitions_to_paused() {
627 let engine = slow_engine(Duration::from_millis(200));
628 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
629
630 tokio::time::sleep(Duration::from_millis(10)).await;
632 engine.pause(id).await.unwrap();
633
634 assert!(matches!(
635 engine.state(id).await.unwrap(),
636 ExecutionState::Paused
637 ));
638 }
639
640 #[tokio::test]
641 async fn resume_transitions_to_running() {
642 let engine = slow_engine(Duration::from_millis(200));
643 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
644
645 tokio::time::sleep(Duration::from_millis(10)).await;
646 engine.pause(id).await.unwrap();
647 engine.resume(id).await.unwrap();
648
649 assert!(matches!(
650 engine.state(id).await.unwrap(),
651 ExecutionState::Running
652 ));
653 }
654
655 #[tokio::test]
656 async fn pause_on_completed_flow_returns_invalid_transition() {
657 let engine = FlowEngine::new(NodeRegistry::with_defaults());
658 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
659
660 tokio::time::sleep(Duration::from_millis(50)).await;
661 let err = engine.pause(id).await.unwrap_err();
663 assert!(matches!(err, FlowError::InvalidTransition { .. }));
664 }
665
666 #[tokio::test]
667 async fn resume_on_running_flow_returns_invalid_transition() {
668 let engine = slow_engine(Duration::from_millis(200));
669 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
670
671 tokio::time::sleep(Duration::from_millis(10)).await;
672 let err = engine.resume(id).await.unwrap_err();
674 assert!(matches!(err, FlowError::InvalidTransition { .. }));
675
676 engine.terminate(id).await.unwrap();
677 }
678
679 #[tokio::test]
682 async fn terminate_stops_slow_execution() {
683 let engine = slow_engine(Duration::from_millis(500));
684 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
685
686 tokio::time::sleep(Duration::from_millis(10)).await;
687 engine.terminate(id).await.unwrap();
688
689 tokio::time::sleep(Duration::from_millis(50)).await;
691
692 assert!(matches!(
693 engine.state(id).await.unwrap(),
694 ExecutionState::Terminated
695 ));
696 }
697
698 #[tokio::test]
699 async fn terminate_unblocks_paused_execution() {
700 let engine = slow_engine(Duration::from_millis(500));
701 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
702
703 tokio::time::sleep(Duration::from_millis(10)).await;
704 engine.pause(id).await.unwrap();
705
706 engine.terminate(id).await.unwrap();
708
709 tokio::time::sleep(Duration::from_millis(600)).await;
710
711 assert!(matches!(
712 engine.state(id).await.unwrap(),
713 ExecutionState::Terminated
714 ));
715 }
716
717 #[tokio::test]
718 async fn terminate_on_completed_flow_returns_invalid_transition() {
719 let engine = FlowEngine::new(NodeRegistry::with_defaults());
720 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
721
722 tokio::time::sleep(Duration::from_millis(50)).await;
723 let err = engine.terminate(id).await.unwrap_err();
724 assert!(matches!(err, FlowError::InvalidTransition { .. }));
725 }
726
727 #[tokio::test]
728 async fn unknown_execution_id_returns_not_found() {
729 let engine = FlowEngine::new(NodeRegistry::with_defaults());
730 let id = Uuid::new_v4();
731 assert!(matches!(
732 engine.pause(id).await,
733 Err(FlowError::ExecutionNotFound(_))
734 ));
735 assert!(matches!(
736 engine.resume(id).await,
737 Err(FlowError::ExecutionNotFound(_))
738 ));
739 assert!(matches!(
740 engine.terminate(id).await,
741 Err(FlowError::ExecutionNotFound(_))
742 ));
743 }
744
745 #[tokio::test]
748 async fn execution_store_saves_completed_result() {
749 use crate::store::MemoryExecutionStore;
750
751 let store = Arc::new(MemoryExecutionStore::new());
752 let engine = FlowEngine::new(NodeRegistry::with_defaults())
753 .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
754
755 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
756 tokio::time::sleep(Duration::from_millis(50)).await;
757
758 let ids = store.list().await.unwrap();
760 assert!(ids.contains(&id), "stored execution id not found");
761
762 let saved = store.load(id).await.unwrap().unwrap();
763 assert_eq!(saved.execution_id, id);
764 assert!(saved.outputs.contains_key("a"));
765 assert!(saved.outputs.contains_key("b"));
766 }
767
768 #[tokio::test]
769 async fn execution_store_not_used_on_terminated_execution() {
770 use crate::store::MemoryExecutionStore;
771
772 let store = Arc::new(MemoryExecutionStore::new());
773 let engine = slow_engine(Duration::from_millis(500))
774 .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
775
776 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
777 tokio::time::sleep(Duration::from_millis(10)).await;
778 engine.terminate(id).await.unwrap();
779 tokio::time::sleep(Duration::from_millis(50)).await;
780
781 assert!(
783 store.list().await.unwrap().is_empty(),
784 "terminated result should not be stored"
785 );
786 }
787
788 #[tokio::test]
791 async fn engine_emitter_receives_flow_and_node_events() {
792 use crate::event::EventEmitter;
793 use std::sync::atomic::{AtomicU32, Ordering};
794
795 struct CountEmitter {
796 flow_started: Arc<AtomicU32>,
797 flow_completed: Arc<AtomicU32>,
798 node_started: Arc<AtomicU32>,
799 node_completed: Arc<AtomicU32>,
800 }
801
802 #[async_trait::async_trait]
803 impl EventEmitter for CountEmitter {
804 async fn on_flow_started(&self, _: Uuid) {
805 self.flow_started.fetch_add(1, Ordering::SeqCst);
806 }
807 async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {
808 self.flow_completed.fetch_add(1, Ordering::SeqCst);
809 }
810 async fn on_flow_failed(&self, _: Uuid, _: &str) {}
811 async fn on_flow_terminated(&self, _: Uuid) {}
812 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
813 self.node_started.fetch_add(1, Ordering::SeqCst);
814 }
815 async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {
816 self.node_completed.fetch_add(1, Ordering::SeqCst);
817 }
818 async fn on_node_skipped(&self, _: Uuid, _: &str) {}
819 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
820 }
821
822 let flow_started = Arc::new(AtomicU32::new(0));
823 let flow_completed = Arc::new(AtomicU32::new(0));
824 let node_started = Arc::new(AtomicU32::new(0));
825 let node_completed = Arc::new(AtomicU32::new(0));
826
827 let emitter = Arc::new(CountEmitter {
828 flow_started: Arc::clone(&flow_started),
829 flow_completed: Arc::clone(&flow_completed),
830 node_started: Arc::clone(&node_started),
831 node_completed: Arc::clone(&node_completed),
832 });
833
834 let engine = FlowEngine::new(NodeRegistry::with_defaults())
835 .with_event_emitter(emitter as Arc<dyn EventEmitter>);
836
837 engine.start(&simple_def(), HashMap::new()).await.unwrap();
839 tokio::time::sleep(Duration::from_millis(50)).await;
840
841 assert_eq!(flow_started.load(Ordering::SeqCst), 1, "flow_started");
842 assert_eq!(flow_completed.load(Ordering::SeqCst), 1, "flow_completed");
843 assert_eq!(node_started.load(Ordering::SeqCst), 2, "node_started (a+b)");
844 assert_eq!(
845 node_completed.load(Ordering::SeqCst),
846 2,
847 "node_completed (a+b)"
848 );
849 }
850
851 #[tokio::test]
854 async fn start_named_loads_and_runs_from_flow_store() {
855 use crate::flow_store::MemoryFlowStore;
856
857 let flow_store = Arc::new(MemoryFlowStore::new());
858 flow_store.save("greet", &simple_def()).await.unwrap();
859
860 let engine = FlowEngine::new(NodeRegistry::with_defaults())
861 .with_flow_store(Arc::clone(&flow_store) as Arc<dyn crate::flow_store::FlowStore>);
862
863 let id = engine.start_named("greet", HashMap::new()).await.unwrap();
864 assert!(!id.is_nil());
865
866 tokio::time::sleep(Duration::from_millis(50)).await;
867 assert!(matches!(
868 engine.state(id).await.unwrap(),
869 ExecutionState::Completed(_)
870 ));
871 }
872
873 #[tokio::test]
874 async fn start_named_returns_flow_not_found_for_unknown_name() {
875 use crate::flow_store::MemoryFlowStore;
876
877 let engine = FlowEngine::new(NodeRegistry::with_defaults())
878 .with_flow_store(
879 Arc::new(MemoryFlowStore::new()) as Arc<dyn crate::flow_store::FlowStore>
880 );
881
882 let err = engine
883 .start_named("nonexistent", HashMap::new())
884 .await
885 .unwrap_err();
886
887 assert!(
888 matches!(err, FlowError::FlowNotFound(ref n) if n == "nonexistent"),
889 "expected FlowNotFound, got: {err}"
890 );
891 }
892
893 #[tokio::test]
894 async fn start_named_returns_internal_when_no_store_configured() {
895 let engine = FlowEngine::new(NodeRegistry::with_defaults());
896
897 let err = engine
898 .start_named("anything", HashMap::new())
899 .await
900 .unwrap_err();
901
902 assert!(
903 matches!(err, FlowError::Internal(_)),
904 "expected Internal, got: {err}"
905 );
906 }
907
908 #[tokio::test]
911 async fn start_streaming_delivers_flow_started_and_completed_events() {
912 use crate::event::FlowEvent;
913
914 let engine = FlowEngine::new(NodeRegistry::with_defaults());
915 let (_, mut rx) = engine
916 .start_streaming(&simple_def(), HashMap::new())
917 .await
918 .unwrap();
919
920 let mut saw_started = false;
921 let mut saw_completed = false;
922
923 loop {
924 match rx.recv().await {
925 Ok(FlowEvent::FlowStarted { .. }) => saw_started = true,
926 Ok(FlowEvent::FlowCompleted { .. }) => {
927 saw_completed = true;
928 break;
929 }
930 Ok(_) => {}
931 Err(_) => break,
932 }
933 }
934
935 assert!(saw_started, "FlowStarted not received");
936 assert!(saw_completed, "FlowCompleted not received");
937 }
938
939 #[tokio::test]
940 async fn start_streaming_delivers_node_events_for_each_node() {
941 use crate::event::FlowEvent;
942 use std::collections::HashSet;
943
944 let engine = FlowEngine::new(NodeRegistry::with_defaults());
945 let (_, mut rx) = engine
946 .start_streaming(&simple_def(), HashMap::new())
947 .await
948 .unwrap();
949
950 let mut completed_nodes: HashSet<String> = HashSet::new();
951
952 loop {
953 match rx.recv().await {
954 Ok(FlowEvent::NodeCompleted { node_id, .. }) => {
955 completed_nodes.insert(node_id);
956 }
957 Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
958 Ok(_) => {}
959 }
960 }
961
962 assert!(completed_nodes.contains("a"), "node 'a' not in stream");
963 assert!(completed_nodes.contains("b"), "node 'b' not in stream");
964 }
965
966 #[tokio::test]
967 async fn start_streaming_zero_events_lost_on_fast_flow() {
968 let engine = FlowEngine::new(NodeRegistry::with_defaults());
971 let def = json!({ "nodes": [{ "id": "x", "type": "noop" }], "edges": [] });
972
973 let (_id, mut rx) = engine.start_streaming(&def, HashMap::new()).await.unwrap();
974
975 let mut event_count = 0u32;
976 loop {
977 match rx.recv().await {
978 Ok(_) => event_count += 1,
979 Err(_) => break,
980 }
981 }
982 assert!(event_count >= 4, "expected ≥4 events, got {event_count}");
984 }
985
986 #[tokio::test]
987 async fn start_streaming_existing_emitter_also_fires() {
988 use crate::event::{EventEmitter, FlowEvent};
989 use std::sync::atomic::{AtomicU32, Ordering};
990
991 struct CountEmitter(Arc<AtomicU32>);
992
993 #[async_trait::async_trait]
994 impl EventEmitter for CountEmitter {
995 async fn on_flow_started(&self, _: Uuid) {
996 self.0.fetch_add(1, Ordering::SeqCst);
997 }
998 async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {}
999 async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1000 async fn on_flow_terminated(&self, _: Uuid) {}
1001 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
1002 async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {}
1003 async fn on_node_skipped(&self, _: Uuid, _: &str) {}
1004 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
1005 }
1006
1007 let counter = Arc::new(AtomicU32::new(0));
1008 let engine = FlowEngine::new(NodeRegistry::with_defaults())
1009 .with_event_emitter(
1010 Arc::new(CountEmitter(Arc::clone(&counter))) as Arc<dyn EventEmitter>
1011 );
1012
1013 let (_id, mut rx) = engine
1014 .start_streaming(&simple_def(), HashMap::new())
1015 .await
1016 .unwrap();
1017
1018 loop {
1020 match rx.recv().await {
1021 Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1022 Ok(_) => {}
1023 }
1024 }
1025
1026 assert_eq!(
1028 counter.load(Ordering::SeqCst),
1029 1,
1030 "existing emitter did not fire"
1031 );
1032 }
1033
1034 #[test]
1037 fn validate_returns_empty_for_valid_flow() {
1038 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1039 let def = json!({
1040 "nodes": [
1041 { "id": "a", "type": "noop" },
1042 { "id": "b", "type": "noop" }
1043 ],
1044 "edges": [{ "source": "a", "target": "b" }]
1045 });
1046 assert!(engine.validate(&def).is_empty());
1047 }
1048
1049 #[test]
1050 fn validate_catches_unknown_node_type() {
1051 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1052 let def = json!({
1053 "nodes": [
1054 { "id": "a", "type": "noop" },
1055 { "id": "b", "type": "does-not-exist" }
1056 ],
1057 "edges": []
1058 });
1059 let issues = engine.validate(&def);
1060 assert_eq!(issues.len(), 1);
1061 assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1062 assert!(issues[0].message.contains("unknown node type"));
1063 }
1064
1065 #[test]
1066 fn validate_catches_cyclic_graph() {
1067 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1068 let def = json!({
1069 "nodes": [
1070 { "id": "a", "type": "noop" },
1071 { "id": "b", "type": "noop" }
1072 ],
1073 "edges": [
1074 { "source": "a", "target": "b" },
1075 { "source": "b", "target": "a" }
1076 ]
1077 });
1078 let issues = engine.validate(&def);
1079 assert_eq!(issues.len(), 1);
1080 assert!(issues[0].node_id.is_none());
1081 }
1082
1083 #[test]
1084 fn validate_catches_run_if_referencing_unknown_node() {
1085 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1086 let def = json!({
1087 "nodes": [
1088 { "id": "a", "type": "noop" },
1089 {
1090 "id": "b",
1091 "type": "noop",
1092 "data": {
1093 "run_if": { "from": "ghost", "path": "", "op": "eq", "value": true }
1094 }
1095 }
1096 ],
1097 "edges": [{ "source": "a", "target": "b" }]
1098 });
1099 let issues = engine.validate(&def);
1100 assert_eq!(issues.len(), 1);
1101 assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1102 assert!(issues[0].message.contains("ghost"));
1103 }
1104
1105 #[test]
1106 fn validate_reports_multiple_issues() {
1107 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1108 let def = json!({
1109 "nodes": [
1110 { "id": "a", "type": "bad-type-1" },
1111 { "id": "b", "type": "bad-type-2" }
1112 ],
1113 "edges": []
1114 });
1115 assert_eq!(engine.validate(&def).len(), 2);
1116 }
1117}