1use std::collections::HashMap;
30use std::sync::{Arc, RwLock as SyncRwLock};
31
32use serde_json::Value;
33use tokio::sync::{watch, RwLock};
34use tokio_util::sync::CancellationToken;
35use tracing::warn;
36use uuid::Uuid;
37
38use tokio::sync::broadcast;
39
40use crate::error::{FlowError, Result};
41use crate::event::{ChannelEmitter, EventEmitter, FlowEvent, MulticastEmitter, NoopEventEmitter};
42use crate::execution::{ExecutionHandle, ExecutionState};
43use crate::flow_store::FlowStore;
44use crate::graph::DagGraph;
45use crate::registry::NodeRegistry;
46use crate::runner::{FlowRunner, FlowSignal};
47use crate::store::ExecutionStore;
48use crate::validation::ValidationIssue;
49
50pub struct FlowEngine {
82 registry: Arc<NodeRegistry>,
83 executions: Arc<RwLock<HashMap<Uuid, ExecutionHandle>>>,
84 execution_store: Option<Arc<dyn ExecutionStore>>,
86 flow_store: Option<Arc<dyn FlowStore>>,
88 emitter: Arc<dyn EventEmitter>,
90 max_concurrency: Option<usize>,
92}
93
94impl FlowEngine {
95 pub fn new(registry: NodeRegistry) -> Self {
101 Self {
102 registry: Arc::new(registry),
103 executions: Arc::new(RwLock::new(HashMap::new())),
104 execution_store: None,
105 flow_store: None,
106 emitter: Arc::new(NoopEventEmitter),
107 max_concurrency: None,
108 }
109 }
110
111 pub fn with_execution_store(mut self, store: Arc<dyn ExecutionStore>) -> Self {
116 self.execution_store = Some(store);
117 self
118 }
119
120 pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
126 self.flow_store = Some(store);
127 self
128 }
129
130 pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
135 self.emitter = emitter;
136 self
137 }
138
139 pub fn with_max_concurrency(mut self, n: usize) -> Self {
144 self.max_concurrency = Some(n);
145 self
146 }
147
148 pub fn node_types(&self) -> Vec<String> {
155 self.registry.list_types()
156 }
157
158 pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue> {
188 let mut issues = Vec::new();
189
190 let dag = match DagGraph::from_json(definition) {
192 Ok(dag) => dag,
193 Err(e) => {
194 issues.push(ValidationIssue {
195 node_id: None,
196 message: e.to_string(),
197 });
198 return issues;
199 }
200 };
201
202 for node_def in dag.nodes_in_order() {
203 if self.registry.get(&node_def.node_type).is_err() {
205 issues.push(ValidationIssue {
206 node_id: Some(node_def.id.clone()),
207 message: format!("unknown node type '{}'", node_def.node_type),
208 });
209 }
210
211 if let Some(ref cond) = node_def.run_if {
213 if !dag.nodes.contains_key(&cond.from) {
214 issues.push(ValidationIssue {
215 node_id: Some(node_def.id.clone()),
216 message: format!("run_if references unknown node '{}'", cond.from),
217 });
218 }
219 }
220 }
221
222 issues
223 }
224
225 pub async fn start(
238 &self,
239 definition: &Value,
240 variables: HashMap<String, Value>,
241 ) -> Result<Uuid> {
242 self.start_inner(definition, variables, Arc::clone(&self.emitter))
243 .await
244 }
245
246 pub async fn start_streaming(
288 &self,
289 definition: &Value,
290 variables: HashMap<String, Value>,
291 ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
292 let (tx, rx) = broadcast::channel(256);
293 let channel_emitter = Arc::new(ChannelEmitter::new(tx));
294 let multicast: Arc<dyn EventEmitter> = Arc::new(MulticastEmitter {
295 a: Arc::clone(&self.emitter),
296 b: channel_emitter,
297 });
298 let id = self.start_inner(definition, variables, multicast).await?;
299 Ok((id, rx))
300 }
301
302 async fn start_inner(
305 &self,
306 definition: &Value,
307 variables: HashMap<String, Value>,
308 emitter: Arc<dyn EventEmitter>,
309 ) -> Result<Uuid> {
310 let dag = DagGraph::from_json(definition)?;
311 let mut runner = FlowRunner::new(dag, (*self.registry).clone()).with_event_emitter(emitter);
312 if let Some(ref fs) = self.flow_store {
313 runner = runner.with_flow_store(Arc::clone(fs));
314 }
315 if let Some(n) = self.max_concurrency {
316 runner = runner.with_max_concurrency(n);
317 }
318
319 let execution_id = Uuid::new_v4();
320 let cancel = CancellationToken::new();
321 let (signal_tx, signal_rx) = watch::channel(FlowSignal::Run);
322 let state = Arc::new(RwLock::new(ExecutionState::Running));
323 let context: Arc<SyncRwLock<HashMap<String, Value>>> =
324 Arc::new(SyncRwLock::new(HashMap::new()));
325
326 let handle = ExecutionHandle {
327 state: Arc::clone(&state),
328 signal_tx,
329 cancel: cancel.clone(),
330 context: Arc::clone(&context),
331 };
332
333 self.executions.write().await.insert(execution_id, handle);
334
335 let state_for_task = Arc::clone(&state);
338 let execution_store = self.execution_store.clone();
339 tokio::spawn(async move {
340 match runner
341 .run_controlled(execution_id, variables, signal_rx, cancel, context)
342 .await
343 {
344 Ok(result) => {
345 if let Some(ref store) = execution_store {
347 if let Err(e) = store.save(&result).await {
348 warn!(%execution_id, error = %e, "failed to persist execution result");
349 }
350 }
351 *state_for_task.write().await = ExecutionState::Completed(result);
352 }
353 Err(FlowError::Terminated) => {
354 *state_for_task.write().await = ExecutionState::Terminated;
355 }
356 Err(e) => {
357 *state_for_task.write().await = ExecutionState::Failed(e.to_string());
358 }
359 }
360 });
361
362 Ok(execution_id)
363 }
364
365 pub async fn start_named(&self, name: &str, variables: HashMap<String, Value>) -> Result<Uuid> {
381 let store = self.flow_store.as_ref().ok_or_else(|| {
382 FlowError::Internal("no FlowStore configured; call with_flow_store first".into())
383 })?;
384
385 let definition = store
386 .load(name)
387 .await?
388 .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
389
390 self.start(&definition, variables).await
391 }
392
393 pub async fn pause(&self, id: Uuid) -> Result<()> {
403 let executions = self.executions.read().await;
404 let handle = executions
405 .get(&id)
406 .ok_or(FlowError::ExecutionNotFound(id))?;
407
408 let mut state = handle.state.write().await;
409 match *state {
410 ExecutionState::Running => {
411 handle.signal_tx.send(FlowSignal::Pause).ok();
412 *state = ExecutionState::Paused;
413 Ok(())
414 }
415 ref s => Err(FlowError::InvalidTransition {
416 action: "pause".into(),
417 from: s.as_str().into(),
418 }),
419 }
420 }
421
422 pub async fn resume(&self, id: Uuid) -> Result<()> {
429 let executions = self.executions.read().await;
430 let handle = executions
431 .get(&id)
432 .ok_or(FlowError::ExecutionNotFound(id))?;
433
434 let mut state = handle.state.write().await;
435 match *state {
436 ExecutionState::Paused => {
437 handle.signal_tx.send(FlowSignal::Run).ok();
438 *state = ExecutionState::Running;
439 Ok(())
440 }
441 ref s => Err(FlowError::InvalidTransition {
442 action: "resume".into(),
443 from: s.as_str().into(),
444 }),
445 }
446 }
447
448 pub async fn terminate(&self, id: Uuid) -> Result<()> {
461 let executions = self.executions.read().await;
462 let handle = executions
463 .get(&id)
464 .ok_or(FlowError::ExecutionNotFound(id))?;
465
466 let state = handle.state.read().await;
467 if state.is_terminal() {
468 return Err(FlowError::InvalidTransition {
469 action: "terminate".into(),
470 from: state.as_str().into(),
471 });
472 }
473 drop(state);
474
475 handle.cancel.cancel();
476 handle.signal_tx.send(FlowSignal::Run).ok();
478 Ok(())
479 }
480
481 pub async fn state(&self, id: Uuid) -> Result<ExecutionState> {
487 let executions = self.executions.read().await;
488 let handle = executions
489 .get(&id)
490 .ok_or(FlowError::ExecutionNotFound(id))?;
491 let snapshot = handle.state.read().await.clone();
493 Ok(snapshot)
494 }
495
496 pub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>> {
509 let executions = self.executions.read().await;
510 let handle = executions
511 .get(&id)
512 .ok_or(FlowError::ExecutionNotFound(id))?;
513 let snapshot = handle.context.read().unwrap().clone();
514 Ok(snapshot)
515 }
516
517 pub async fn set_context_entry(&self, id: Uuid, key: String, value: Value) -> Result<()> {
527 let executions = self.executions.read().await;
528 let handle = executions
529 .get(&id)
530 .ok_or(FlowError::ExecutionNotFound(id))?;
531 handle.context.write().unwrap().insert(key, value);
532 Ok(())
533 }
534
535 pub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool> {
544 let executions = self.executions.read().await;
545 let handle = executions
546 .get(&id)
547 .ok_or(FlowError::ExecutionNotFound(id))?;
548 let removed = handle.context.write().unwrap().remove(key).is_some();
549 Ok(removed)
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::node::{ExecContext, Node};
557 use async_trait::async_trait;
558 use serde_json::{json, Value};
559 use std::time::Duration;
560
561 struct SlowNode(Duration);
565
566 #[async_trait]
567 impl Node for SlowNode {
568 fn node_type(&self) -> &str {
569 "slow"
570 }
571
572 async fn execute(&self, _ctx: ExecContext) -> crate::error::Result<Value> {
573 tokio::time::sleep(self.0).await;
574 Ok(json!({}))
575 }
576 }
577
578 fn slow_engine(delay: Duration) -> FlowEngine {
579 let mut registry = NodeRegistry::with_defaults();
580 registry.register(Arc::new(SlowNode(delay)));
581 FlowEngine::new(registry)
582 }
583
584 fn simple_def() -> Value {
585 json!({
586 "nodes": [
587 { "id": "a", "type": "noop" },
588 { "id": "b", "type": "noop" }
589 ],
590 "edges": [{ "source": "a", "target": "b" }]
591 })
592 }
593
594 fn slow_def() -> Value {
595 json!({
596 "nodes": [
597 { "id": "a", "type": "slow" },
598 { "id": "b", "type": "slow" }
599 ],
600 "edges": [{ "source": "a", "target": "b" }]
601 })
602 }
603
604 #[test]
607 fn node_types_includes_builtins() {
608 let engine = FlowEngine::new(NodeRegistry::with_defaults());
609 let types = engine.node_types();
610 assert!(types.contains(&"noop".to_string()));
611 }
612
613 #[test]
614 fn node_types_includes_custom_nodes() {
615 let mut registry = NodeRegistry::with_defaults();
616 registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
617 let engine = FlowEngine::new(registry);
618
619 let types = engine.node_types();
620 assert!(types.contains(&"noop".to_string()));
621 assert!(types.contains(&"slow".to_string()));
622 }
623
624 #[test]
625 fn node_types_is_sorted() {
626 let engine = FlowEngine::new(NodeRegistry::with_defaults());
627 let types = engine.node_types();
628 let mut sorted = types.clone();
629 sorted.sort();
630 assert_eq!(types, sorted);
631 }
632
633 #[tokio::test]
636 async fn start_returns_execution_id() {
637 let engine = FlowEngine::new(NodeRegistry::with_defaults());
638 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
639 assert!(!id.is_nil());
641 }
642
643 #[tokio::test]
644 async fn start_rejects_invalid_definition() {
645 let engine = FlowEngine::new(NodeRegistry::with_defaults());
646 let bad = json!({
647 "nodes": [{ "id": "a", "type": "noop" }],
648 "edges": [{ "source": "ghost", "target": "a" }]
649 });
650 assert!(matches!(
651 engine.start(&bad, HashMap::new()).await,
652 Err(FlowError::UnknownNode(_))
653 ));
654 }
655
656 #[tokio::test]
657 async fn completed_flow_has_outputs() {
658 let engine = FlowEngine::new(NodeRegistry::with_defaults());
659 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
660
661 tokio::time::sleep(Duration::from_millis(50)).await;
663
664 let state = engine.state(id).await.unwrap();
665 if let ExecutionState::Completed(result) = state {
666 assert!(result.outputs.contains_key("a"));
667 assert!(result.outputs.contains_key("b"));
668 } else {
669 panic!("expected Completed, got {}", state.as_str());
670 }
671 }
672
673 #[tokio::test]
676 async fn state_returns_not_found_for_unknown_id() {
677 let engine = FlowEngine::new(NodeRegistry::with_defaults());
678 let err = engine.state(Uuid::new_v4()).await.unwrap_err();
679 assert!(matches!(err, FlowError::ExecutionNotFound(_)));
680 }
681
682 #[tokio::test]
685 async fn pause_transitions_to_paused() {
686 let engine = slow_engine(Duration::from_millis(200));
687 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
688
689 tokio::time::sleep(Duration::from_millis(10)).await;
691 engine.pause(id).await.unwrap();
692
693 assert!(matches!(
694 engine.state(id).await.unwrap(),
695 ExecutionState::Paused
696 ));
697 }
698
699 #[tokio::test]
700 async fn resume_transitions_to_running() {
701 let engine = slow_engine(Duration::from_millis(200));
702 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
703
704 tokio::time::sleep(Duration::from_millis(10)).await;
705 engine.pause(id).await.unwrap();
706 engine.resume(id).await.unwrap();
707
708 assert!(matches!(
709 engine.state(id).await.unwrap(),
710 ExecutionState::Running
711 ));
712 }
713
714 #[tokio::test]
715 async fn pause_on_completed_flow_returns_invalid_transition() {
716 let engine = FlowEngine::new(NodeRegistry::with_defaults());
717 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
718
719 tokio::time::sleep(Duration::from_millis(50)).await;
720 let err = engine.pause(id).await.unwrap_err();
722 assert!(matches!(err, FlowError::InvalidTransition { .. }));
723 }
724
725 #[tokio::test]
726 async fn resume_on_running_flow_returns_invalid_transition() {
727 let engine = slow_engine(Duration::from_millis(200));
728 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
729
730 tokio::time::sleep(Duration::from_millis(10)).await;
731 let err = engine.resume(id).await.unwrap_err();
733 assert!(matches!(err, FlowError::InvalidTransition { .. }));
734
735 engine.terminate(id).await.unwrap();
736 }
737
738 #[tokio::test]
741 async fn terminate_stops_slow_execution() {
742 let engine = slow_engine(Duration::from_millis(500));
743 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
744
745 tokio::time::sleep(Duration::from_millis(10)).await;
746 engine.terminate(id).await.unwrap();
747
748 tokio::time::sleep(Duration::from_millis(50)).await;
750
751 assert!(matches!(
752 engine.state(id).await.unwrap(),
753 ExecutionState::Terminated
754 ));
755 }
756
757 #[tokio::test]
758 async fn terminate_unblocks_paused_execution() {
759 let engine = slow_engine(Duration::from_millis(500));
760 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
761
762 tokio::time::sleep(Duration::from_millis(10)).await;
763 engine.pause(id).await.unwrap();
764
765 engine.terminate(id).await.unwrap();
767
768 tokio::time::sleep(Duration::from_millis(600)).await;
769
770 assert!(matches!(
771 engine.state(id).await.unwrap(),
772 ExecutionState::Terminated
773 ));
774 }
775
776 #[tokio::test]
777 async fn terminate_on_completed_flow_returns_invalid_transition() {
778 let engine = FlowEngine::new(NodeRegistry::with_defaults());
779 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
780
781 tokio::time::sleep(Duration::from_millis(50)).await;
782 let err = engine.terminate(id).await.unwrap_err();
783 assert!(matches!(err, FlowError::InvalidTransition { .. }));
784 }
785
786 #[tokio::test]
787 async fn unknown_execution_id_returns_not_found() {
788 let engine = FlowEngine::new(NodeRegistry::with_defaults());
789 let id = Uuid::new_v4();
790 assert!(matches!(
791 engine.pause(id).await,
792 Err(FlowError::ExecutionNotFound(_))
793 ));
794 assert!(matches!(
795 engine.resume(id).await,
796 Err(FlowError::ExecutionNotFound(_))
797 ));
798 assert!(matches!(
799 engine.terminate(id).await,
800 Err(FlowError::ExecutionNotFound(_))
801 ));
802 }
803
804 #[tokio::test]
807 async fn execution_store_saves_completed_result() {
808 use crate::store::MemoryExecutionStore;
809
810 let store = Arc::new(MemoryExecutionStore::new());
811 let engine = FlowEngine::new(NodeRegistry::with_defaults())
812 .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
813
814 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
815 tokio::time::sleep(Duration::from_millis(50)).await;
816
817 let ids = store.list().await.unwrap();
819 assert!(ids.contains(&id), "stored execution id not found");
820
821 let saved = store.load(id).await.unwrap().unwrap();
822 assert_eq!(saved.execution_id, id);
823 assert!(saved.outputs.contains_key("a"));
824 assert!(saved.outputs.contains_key("b"));
825 }
826
827 #[tokio::test]
828 async fn execution_store_not_used_on_terminated_execution() {
829 use crate::store::MemoryExecutionStore;
830
831 let store = Arc::new(MemoryExecutionStore::new());
832 let engine = slow_engine(Duration::from_millis(500))
833 .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
834
835 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
836 tokio::time::sleep(Duration::from_millis(10)).await;
837 engine.terminate(id).await.unwrap();
838 tokio::time::sleep(Duration::from_millis(50)).await;
839
840 assert!(
842 store.list().await.unwrap().is_empty(),
843 "terminated result should not be stored"
844 );
845 }
846
847 #[tokio::test]
850 async fn engine_emitter_receives_flow_and_node_events() {
851 use crate::event::EventEmitter;
852 use std::sync::atomic::{AtomicU32, Ordering};
853
854 struct CountEmitter {
855 flow_started: Arc<AtomicU32>,
856 flow_completed: Arc<AtomicU32>,
857 node_started: Arc<AtomicU32>,
858 node_completed: Arc<AtomicU32>,
859 }
860
861 #[async_trait::async_trait]
862 impl EventEmitter for CountEmitter {
863 async fn on_flow_started(&self, _: Uuid) {
864 self.flow_started.fetch_add(1, Ordering::SeqCst);
865 }
866 async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {
867 self.flow_completed.fetch_add(1, Ordering::SeqCst);
868 }
869 async fn on_flow_failed(&self, _: Uuid, _: &str) {}
870 async fn on_flow_terminated(&self, _: Uuid) {}
871 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
872 self.node_started.fetch_add(1, Ordering::SeqCst);
873 }
874 async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {
875 self.node_completed.fetch_add(1, Ordering::SeqCst);
876 }
877 async fn on_node_skipped(&self, _: Uuid, _: &str) {}
878 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
879 }
880
881 let flow_started = Arc::new(AtomicU32::new(0));
882 let flow_completed = Arc::new(AtomicU32::new(0));
883 let node_started = Arc::new(AtomicU32::new(0));
884 let node_completed = Arc::new(AtomicU32::new(0));
885
886 let emitter = Arc::new(CountEmitter {
887 flow_started: Arc::clone(&flow_started),
888 flow_completed: Arc::clone(&flow_completed),
889 node_started: Arc::clone(&node_started),
890 node_completed: Arc::clone(&node_completed),
891 });
892
893 let engine = FlowEngine::new(NodeRegistry::with_defaults())
894 .with_event_emitter(emitter as Arc<dyn EventEmitter>);
895
896 engine.start(&simple_def(), HashMap::new()).await.unwrap();
898 tokio::time::sleep(Duration::from_millis(50)).await;
899
900 assert_eq!(flow_started.load(Ordering::SeqCst), 1, "flow_started");
901 assert_eq!(flow_completed.load(Ordering::SeqCst), 1, "flow_completed");
902 assert_eq!(node_started.load(Ordering::SeqCst), 2, "node_started (a+b)");
903 assert_eq!(
904 node_completed.load(Ordering::SeqCst),
905 2,
906 "node_completed (a+b)"
907 );
908 }
909
910 #[tokio::test]
913 async fn start_named_loads_and_runs_from_flow_store() {
914 use crate::flow_store::MemoryFlowStore;
915
916 let flow_store = Arc::new(MemoryFlowStore::new());
917 flow_store.save("greet", &simple_def()).await.unwrap();
918
919 let engine = FlowEngine::new(NodeRegistry::with_defaults())
920 .with_flow_store(Arc::clone(&flow_store) as Arc<dyn crate::flow_store::FlowStore>);
921
922 let id = engine.start_named("greet", HashMap::new()).await.unwrap();
923 assert!(!id.is_nil());
924
925 tokio::time::sleep(Duration::from_millis(50)).await;
926 assert!(matches!(
927 engine.state(id).await.unwrap(),
928 ExecutionState::Completed(_)
929 ));
930 }
931
932 #[tokio::test]
933 async fn start_named_returns_flow_not_found_for_unknown_name() {
934 use crate::flow_store::MemoryFlowStore;
935
936 let engine = FlowEngine::new(NodeRegistry::with_defaults())
937 .with_flow_store(
938 Arc::new(MemoryFlowStore::new()) as Arc<dyn crate::flow_store::FlowStore>
939 );
940
941 let err = engine
942 .start_named("nonexistent", HashMap::new())
943 .await
944 .unwrap_err();
945
946 assert!(
947 matches!(err, FlowError::FlowNotFound(ref n) if n == "nonexistent"),
948 "expected FlowNotFound, got: {err}"
949 );
950 }
951
952 #[tokio::test]
953 async fn start_named_returns_internal_when_no_store_configured() {
954 let engine = FlowEngine::new(NodeRegistry::with_defaults());
955
956 let err = engine
957 .start_named("anything", HashMap::new())
958 .await
959 .unwrap_err();
960
961 assert!(
962 matches!(err, FlowError::Internal(_)),
963 "expected Internal, got: {err}"
964 );
965 }
966
967 #[tokio::test]
970 async fn start_streaming_delivers_flow_started_and_completed_events() {
971 use crate::event::FlowEvent;
972
973 let engine = FlowEngine::new(NodeRegistry::with_defaults());
974 let (_, mut rx) = engine
975 .start_streaming(&simple_def(), HashMap::new())
976 .await
977 .unwrap();
978
979 let mut saw_started = false;
980 let mut saw_completed = false;
981
982 loop {
983 match rx.recv().await {
984 Ok(FlowEvent::FlowStarted { .. }) => saw_started = true,
985 Ok(FlowEvent::FlowCompleted { .. }) => {
986 saw_completed = true;
987 break;
988 }
989 Ok(_) => {}
990 Err(_) => break,
991 }
992 }
993
994 assert!(saw_started, "FlowStarted not received");
995 assert!(saw_completed, "FlowCompleted not received");
996 }
997
998 #[tokio::test]
999 async fn start_streaming_delivers_node_events_for_each_node() {
1000 use crate::event::FlowEvent;
1001 use std::collections::HashSet;
1002
1003 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1004 let (_, mut rx) = engine
1005 .start_streaming(&simple_def(), HashMap::new())
1006 .await
1007 .unwrap();
1008
1009 let mut completed_nodes: HashSet<String> = HashSet::new();
1010
1011 loop {
1012 match rx.recv().await {
1013 Ok(FlowEvent::NodeCompleted { node_id, .. }) => {
1014 completed_nodes.insert(node_id);
1015 }
1016 Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1017 Ok(_) => {}
1018 }
1019 }
1020
1021 assert!(completed_nodes.contains("a"), "node 'a' not in stream");
1022 assert!(completed_nodes.contains("b"), "node 'b' not in stream");
1023 }
1024
1025 #[tokio::test]
1026 async fn start_streaming_zero_events_lost_on_fast_flow() {
1027 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1030 let def = json!({ "nodes": [{ "id": "x", "type": "noop" }], "edges": [] });
1031
1032 let (_id, mut rx) = engine.start_streaming(&def, HashMap::new()).await.unwrap();
1033
1034 let mut event_count = 0u32;
1035 loop {
1036 match rx.recv().await {
1037 Ok(_) => event_count += 1,
1038 Err(_) => break,
1039 }
1040 }
1041 assert!(event_count >= 4, "expected ≥4 events, got {event_count}");
1043 }
1044
1045 #[tokio::test]
1046 async fn start_streaming_existing_emitter_also_fires() {
1047 use crate::event::{EventEmitter, FlowEvent};
1048 use std::sync::atomic::{AtomicU32, Ordering};
1049
1050 struct CountEmitter(Arc<AtomicU32>);
1051
1052 #[async_trait::async_trait]
1053 impl EventEmitter for CountEmitter {
1054 async fn on_flow_started(&self, _: Uuid) {
1055 self.0.fetch_add(1, Ordering::SeqCst);
1056 }
1057 async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {}
1058 async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1059 async fn on_flow_terminated(&self, _: Uuid) {}
1060 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
1061 async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {}
1062 async fn on_node_skipped(&self, _: Uuid, _: &str) {}
1063 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
1064 }
1065
1066 let counter = Arc::new(AtomicU32::new(0));
1067 let engine = FlowEngine::new(NodeRegistry::with_defaults())
1068 .with_event_emitter(
1069 Arc::new(CountEmitter(Arc::clone(&counter))) as Arc<dyn EventEmitter>
1070 );
1071
1072 let (_id, mut rx) = engine
1073 .start_streaming(&simple_def(), HashMap::new())
1074 .await
1075 .unwrap();
1076
1077 loop {
1079 match rx.recv().await {
1080 Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1081 Ok(_) => {}
1082 }
1083 }
1084
1085 assert_eq!(
1087 counter.load(Ordering::SeqCst),
1088 1,
1089 "existing emitter did not fire"
1090 );
1091 }
1092
1093 #[test]
1096 fn validate_returns_empty_for_valid_flow() {
1097 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1098 let def = json!({
1099 "nodes": [
1100 { "id": "a", "type": "noop" },
1101 { "id": "b", "type": "noop" }
1102 ],
1103 "edges": [{ "source": "a", "target": "b" }]
1104 });
1105 assert!(engine.validate(&def).is_empty());
1106 }
1107
1108 #[test]
1109 fn validate_catches_unknown_node_type() {
1110 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1111 let def = json!({
1112 "nodes": [
1113 { "id": "a", "type": "noop" },
1114 { "id": "b", "type": "does-not-exist" }
1115 ],
1116 "edges": []
1117 });
1118 let issues = engine.validate(&def);
1119 assert_eq!(issues.len(), 1);
1120 assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1121 assert!(issues[0].message.contains("unknown node type"));
1122 }
1123
1124 #[test]
1125 fn validate_catches_cyclic_graph() {
1126 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1127 let def = json!({
1128 "nodes": [
1129 { "id": "a", "type": "noop" },
1130 { "id": "b", "type": "noop" }
1131 ],
1132 "edges": [
1133 { "source": "a", "target": "b" },
1134 { "source": "b", "target": "a" }
1135 ]
1136 });
1137 let issues = engine.validate(&def);
1138 assert_eq!(issues.len(), 1);
1139 assert!(issues[0].node_id.is_none());
1140 }
1141
1142 #[test]
1143 fn validate_catches_run_if_referencing_unknown_node() {
1144 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1145 let def = json!({
1146 "nodes": [
1147 { "id": "a", "type": "noop" },
1148 {
1149 "id": "b",
1150 "type": "noop",
1151 "data": {
1152 "run_if": { "from": "ghost", "path": "", "op": "eq", "value": true }
1153 }
1154 }
1155 ],
1156 "edges": [{ "source": "a", "target": "b" }]
1157 });
1158 let issues = engine.validate(&def);
1159 assert_eq!(issues.len(), 1);
1160 assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1161 assert!(issues[0].message.contains("ghost"));
1162 }
1163
1164 #[test]
1165 fn validate_reports_multiple_issues() {
1166 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1167 let def = json!({
1168 "nodes": [
1169 { "id": "a", "type": "bad-type-1" },
1170 { "id": "b", "type": "bad-type-2" }
1171 ],
1172 "edges": []
1173 });
1174 assert_eq!(engine.validate(&def).len(), 2);
1175 }
1176}