1use std::collections::HashMap;
30use std::sync::{Arc, RwLock as SyncRwLock};
31
32use serde_json::Value;
33use tokio::sync::{watch, RwLock};
34use tokio_util::sync::CancellationToken;
35use tracing::warn;
36use uuid::Uuid;
37
38use tokio::sync::broadcast;
39
40use crate::capabilities::FlowCapabilities;
41use crate::error::{FlowError, Result};
42use crate::event::{ChannelEmitter, EventEmitter, FlowEvent, MulticastEmitter, NoopEventEmitter};
43use crate::execution::{ExecutionHandle, ExecutionState};
44use crate::flow_store::FlowStore;
45use crate::graph::DagGraph;
46use crate::node::Node;
47use crate::registry::{NodeDescriptor, NodeRegistry};
48use crate::runner::{FlowRunner, FlowSignal};
49use crate::store::ExecutionStore;
50use crate::validation::ValidationIssue;
51
52pub struct FlowEngine {
84 registry: Arc<SyncRwLock<NodeRegistry>>,
85 executions: Arc<RwLock<HashMap<Uuid, ExecutionHandle>>>,
86 execution_store: Option<Arc<dyn ExecutionStore>>,
88 flow_store: Option<Arc<dyn FlowStore>>,
90 emitter: Arc<dyn EventEmitter>,
92 max_concurrency: Option<usize>,
94}
95
96impl FlowEngine {
97 pub fn new(registry: NodeRegistry) -> Self {
103 Self {
104 registry: Arc::new(SyncRwLock::new(registry)),
105 executions: Arc::new(RwLock::new(HashMap::new())),
106 execution_store: None,
107 flow_store: None,
108 emitter: Arc::new(NoopEventEmitter),
109 max_concurrency: None,
110 }
111 }
112
113 pub fn with_execution_store(mut self, store: Arc<dyn ExecutionStore>) -> Self {
118 self.execution_store = Some(store);
119 self
120 }
121
122 pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
128 self.flow_store = Some(store);
129 self
130 }
131
132 pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
137 self.emitter = emitter;
138 self
139 }
140
141 pub fn with_max_concurrency(mut self, n: usize) -> Self {
146 self.max_concurrency = Some(n);
147 self
148 }
149
150 pub fn node_types(&self) -> Vec<String> {
157 self.registry.read().unwrap().list_types()
158 }
159
160 pub fn node_descriptors(&self) -> Vec<NodeDescriptor> {
165 self.registry.read().unwrap().list_descriptors()
166 }
167
168 pub fn capabilities(&self) -> FlowCapabilities {
173 FlowCapabilities::from_nodes(self.node_descriptors())
174 }
175
176 pub fn register_node_type(&self, node: Arc<dyn Node>) {
180 self.registry.write().unwrap().register(node);
181 }
182
183 pub fn register_node_type_with_descriptor(
185 &self,
186 node: Arc<dyn Node>,
187 descriptor: NodeDescriptor,
188 ) {
189 self.registry
190 .write()
191 .unwrap()
192 .register_with_descriptor(node, descriptor);
193 }
194
195 pub fn unregister_node_type(&self, node_type: &str) -> Result<bool> {
203 self.registry.write().unwrap().unregister(node_type)
204 }
205
206 pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue> {
236 let mut issues = Vec::new();
237
238 let dag = match DagGraph::from_json(definition) {
240 Ok(dag) => dag,
241 Err(e) => {
242 issues.push(ValidationIssue {
243 node_id: None,
244 message: e.to_string(),
245 });
246 return issues;
247 }
248 };
249
250 for node_def in dag.nodes_in_order() {
251 if self
253 .registry
254 .read()
255 .unwrap()
256 .get(&node_def.node_type)
257 .is_err()
258 {
259 issues.push(ValidationIssue {
260 node_id: Some(node_def.id.clone()),
261 message: format!("unknown node type '{}'", node_def.node_type),
262 });
263 }
264
265 if let Some(ref cond) = node_def.run_if {
267 if !dag.nodes.contains_key(&cond.from) {
268 issues.push(ValidationIssue {
269 node_id: Some(node_def.id.clone()),
270 message: format!("run_if references unknown node '{}'", cond.from),
271 });
272 }
273 }
274 }
275
276 issues
277 }
278
279 pub async fn start(
292 &self,
293 definition: &Value,
294 variables: HashMap<String, Value>,
295 ) -> Result<Uuid> {
296 let (id, _rx) = self.start_inner(definition, variables).await?;
297 Ok(id)
298 }
299
300 pub async fn start_streaming(
342 &self,
343 definition: &Value,
344 variables: HashMap<String, Value>,
345 ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
346 let (id, rx) = self.start_inner(definition, variables).await?;
347 Ok((id, rx))
348 }
349
350 pub async fn subscribe(&self, id: Uuid) -> Result<broadcast::Receiver<FlowEvent>> {
355 let executions = self.executions.read().await;
356 let handle = executions
357 .get(&id)
358 .ok_or(FlowError::ExecutionNotFound(id))?;
359 let receiver = handle
360 .event_tx
361 .read()
362 .unwrap()
363 .as_ref()
364 .map(|tx| tx.subscribe())
365 .ok_or_else(|| FlowError::InvalidTransition {
366 action: "subscribe".into(),
367 from: "finished".into(),
368 })?;
369 Ok(receiver)
370 }
371
372 async fn start_inner(
375 &self,
376 definition: &Value,
377 variables: HashMap<String, Value>,
378 ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
379 let dag = DagGraph::from_json(definition)?;
380 let registry = self.registry.read().unwrap().clone();
381 let (event_tx, event_rx) = broadcast::channel(256);
382 let channel_emitter = Arc::new(ChannelEmitter::new(event_tx.clone()));
383 let emitter: Arc<dyn EventEmitter> = Arc::new(MulticastEmitter {
384 a: Arc::clone(&self.emitter),
385 b: channel_emitter,
386 });
387 let mut runner = FlowRunner::new(dag, registry).with_event_emitter(emitter);
388 if let Some(ref fs) = self.flow_store {
389 runner = runner.with_flow_store(Arc::clone(fs));
390 }
391 if let Some(n) = self.max_concurrency {
392 runner = runner.with_max_concurrency(n);
393 }
394
395 let execution_id = Uuid::new_v4();
396 let cancel = CancellationToken::new();
397 let (signal_tx, signal_rx) = watch::channel(FlowSignal::Run);
398 let state = Arc::new(RwLock::new(ExecutionState::Running));
399 let context: Arc<SyncRwLock<HashMap<String, Value>>> =
400 Arc::new(SyncRwLock::new(HashMap::new()));
401 let event_tx_handle = Arc::new(SyncRwLock::new(Some(event_tx)));
402
403 let handle = ExecutionHandle {
404 state: Arc::clone(&state),
405 signal_tx,
406 cancel: cancel.clone(),
407 context: Arc::clone(&context),
408 event_tx: Arc::clone(&event_tx_handle),
409 };
410
411 self.executions.write().await.insert(execution_id, handle);
412
413 let state_for_task = Arc::clone(&state);
416 let event_tx_for_task = Arc::clone(&event_tx_handle);
417 let execution_store = self.execution_store.clone();
418 tokio::spawn(async move {
419 match runner
420 .run_controlled(execution_id, variables, signal_rx, cancel, context)
421 .await
422 {
423 Ok(result) => {
424 if let Some(ref store) = execution_store {
426 if let Err(e) = store.save(&result).await {
427 warn!(%execution_id, error = %e, "failed to persist execution result");
428 }
429 }
430 *state_for_task.write().await = ExecutionState::Completed(result);
431 }
432 Err(FlowError::Terminated) => {
433 *state_for_task.write().await = ExecutionState::Terminated;
434 }
435 Err(e) => {
436 *state_for_task.write().await = ExecutionState::Failed(e.to_string());
437 }
438 }
439 let _ = event_tx_for_task.write().unwrap().take();
440 });
441
442 Ok((execution_id, event_rx))
443 }
444
445 pub async fn start_named(&self, name: &str, variables: HashMap<String, Value>) -> Result<Uuid> {
461 let store = self.flow_store.as_ref().ok_or_else(|| {
462 FlowError::Internal("no FlowStore configured; call with_flow_store first".into())
463 })?;
464
465 let definition = store
466 .load(name)
467 .await?
468 .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
469
470 self.start(&definition, variables).await
471 }
472
473 pub async fn pause(&self, id: Uuid) -> Result<()> {
483 let executions = self.executions.read().await;
484 let handle = executions
485 .get(&id)
486 .ok_or(FlowError::ExecutionNotFound(id))?;
487
488 let mut state = handle.state.write().await;
489 match *state {
490 ExecutionState::Running => {
491 handle.signal_tx.send(FlowSignal::Pause).ok();
492 *state = ExecutionState::Paused;
493 Ok(())
494 }
495 ref s => Err(FlowError::InvalidTransition {
496 action: "pause".into(),
497 from: s.as_str().into(),
498 }),
499 }
500 }
501
502 pub async fn resume(&self, id: Uuid) -> Result<()> {
509 let executions = self.executions.read().await;
510 let handle = executions
511 .get(&id)
512 .ok_or(FlowError::ExecutionNotFound(id))?;
513
514 let mut state = handle.state.write().await;
515 match *state {
516 ExecutionState::Paused => {
517 handle.signal_tx.send(FlowSignal::Run).ok();
518 *state = ExecutionState::Running;
519 Ok(())
520 }
521 ref s => Err(FlowError::InvalidTransition {
522 action: "resume".into(),
523 from: s.as_str().into(),
524 }),
525 }
526 }
527
528 pub async fn terminate(&self, id: Uuid) -> Result<()> {
541 let executions = self.executions.read().await;
542 let handle = executions
543 .get(&id)
544 .ok_or(FlowError::ExecutionNotFound(id))?;
545
546 let state = handle.state.read().await;
547 if state.is_terminal() {
548 return Err(FlowError::InvalidTransition {
549 action: "terminate".into(),
550 from: state.as_str().into(),
551 });
552 }
553 drop(state);
554
555 handle.cancel.cancel();
556 handle.signal_tx.send(FlowSignal::Run).ok();
558 Ok(())
559 }
560
561 pub async fn state(&self, id: Uuid) -> Result<ExecutionState> {
567 let executions = self.executions.read().await;
568 let handle = executions
569 .get(&id)
570 .ok_or(FlowError::ExecutionNotFound(id))?;
571 let snapshot = handle.state.read().await.clone();
573 Ok(snapshot)
574 }
575
576 pub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>> {
589 let executions = self.executions.read().await;
590 let handle = executions
591 .get(&id)
592 .ok_or(FlowError::ExecutionNotFound(id))?;
593 let snapshot = handle.context.read().unwrap().clone();
594 Ok(snapshot)
595 }
596
597 pub async fn set_context_entry(&self, id: Uuid, key: String, value: Value) -> Result<()> {
607 let executions = self.executions.read().await;
608 let handle = executions
609 .get(&id)
610 .ok_or(FlowError::ExecutionNotFound(id))?;
611 handle.context.write().unwrap().insert(key, value);
612 Ok(())
613 }
614
615 pub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool> {
624 let executions = self.executions.read().await;
625 let handle = executions
626 .get(&id)
627 .ok_or(FlowError::ExecutionNotFound(id))?;
628 let removed = handle.context.write().unwrap().remove(key).is_some();
629 Ok(removed)
630 }
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636 use crate::node::{ExecContext, Node};
637 use async_trait::async_trait;
638 use serde_json::{json, Value};
639 use std::time::Duration;
640
641 struct SlowNode(Duration);
645
646 #[async_trait]
647 impl Node for SlowNode {
648 fn node_type(&self) -> &str {
649 "slow"
650 }
651
652 async fn execute(&self, _ctx: ExecContext) -> crate::error::Result<Value> {
653 tokio::time::sleep(self.0).await;
654 Ok(json!({}))
655 }
656 }
657
658 fn slow_engine(delay: Duration) -> FlowEngine {
659 let mut registry = NodeRegistry::with_defaults();
660 registry.register(Arc::new(SlowNode(delay)));
661 FlowEngine::new(registry)
662 }
663
664 fn simple_def() -> Value {
665 json!({
666 "nodes": [
667 { "id": "a", "type": "noop" },
668 { "id": "b", "type": "noop" }
669 ],
670 "edges": [{ "source": "a", "target": "b" }]
671 })
672 }
673
674 fn slow_def() -> Value {
675 json!({
676 "nodes": [
677 { "id": "a", "type": "slow" },
678 { "id": "b", "type": "slow" }
679 ],
680 "edges": [{ "source": "a", "target": "b" }]
681 })
682 }
683
684 #[test]
687 fn node_types_includes_builtins() {
688 let engine = FlowEngine::new(NodeRegistry::with_defaults());
689 let types = engine.node_types();
690 assert!(types.contains(&"noop".to_string()));
691 }
692
693 #[test]
694 fn node_types_includes_custom_nodes() {
695 let mut registry = NodeRegistry::with_defaults();
696 registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
697 let engine = FlowEngine::new(registry);
698
699 let types = engine.node_types();
700 assert!(types.contains(&"noop".to_string()));
701 assert!(types.contains(&"slow".to_string()));
702 }
703
704 #[test]
705 fn node_types_is_sorted() {
706 let engine = FlowEngine::new(NodeRegistry::with_defaults());
707 let types = engine.node_types();
708 let mut sorted = types.clone();
709 sorted.sort();
710 assert_eq!(types, sorted);
711 }
712
713 #[test]
714 fn node_descriptors_include_builtin_metadata() {
715 let engine = FlowEngine::new(NodeRegistry::with_defaults());
716 let descriptors = engine.node_descriptors();
717 let llm = descriptors
718 .iter()
719 .find(|descriptor| descriptor.node_type == "llm")
720 .unwrap();
721 assert_eq!(llm.display_name, "LLM");
722 assert_eq!(llm.category, "ai");
723 assert!(llm.summary.contains("OpenAI-compatible"));
724 assert!(llm.default_data.is_object());
725 assert!(!llm.fields.is_empty());
726 }
727
728 #[test]
729 fn node_descriptors_include_custom_nodes() {
730 let mut registry = NodeRegistry::with_defaults();
731 registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
732 let engine = FlowEngine::new(registry);
733
734 let descriptors = engine.node_descriptors();
735 let slow = descriptors
736 .iter()
737 .find(|descriptor| descriptor.node_type == "slow")
738 .unwrap();
739 assert_eq!(slow.display_name, "slow");
740 assert_eq!(slow.category, "custom");
741 }
742
743 #[test]
744 fn register_node_type_adds_custom_type_at_runtime() {
745 let engine = FlowEngine::new(NodeRegistry::with_defaults());
746 engine.register_node_type(Arc::new(SlowNode(Duration::from_millis(1))));
747
748 let types = engine.node_types();
749 assert!(types.contains(&"slow".to_string()));
750 }
751
752 #[test]
753 fn register_node_type_with_descriptor_updates_catalog() {
754 let engine = FlowEngine::new(NodeRegistry::with_defaults());
755 engine.register_node_type_with_descriptor(
756 Arc::new(SlowNode(Duration::from_millis(1))),
757 NodeDescriptor {
758 node_type: "ignored".to_string(),
759 display_name: "Slow Node".to_string(),
760 category: "testing".to_string(),
761 summary: "Sleeps briefly during tests.".to_string(),
762 default_data: json!({ "delay_ms": 1 }),
763 fields: vec![],
764 },
765 );
766
767 let slow = engine
768 .node_descriptors()
769 .into_iter()
770 .find(|descriptor| descriptor.node_type == "slow")
771 .unwrap();
772 assert_eq!(slow.display_name, "Slow Node");
773 assert_eq!(slow.category, "testing");
774 assert_eq!(slow.default_data["delay_ms"], 1);
775 }
776
777 #[test]
778 fn unregister_node_type_removes_runtime_type() {
779 let engine = FlowEngine::new(NodeRegistry::with_defaults());
780 engine.register_node_type(Arc::new(SlowNode(Duration::from_millis(1))));
781
782 assert!(engine.unregister_node_type("slow").unwrap());
783 assert!(!engine.node_types().contains(&"slow".to_string()));
784
785 let def = json!({
786 "nodes": [{ "id": "a", "type": "slow" }],
787 "edges": []
788 });
789 let issues = engine.validate(&def);
790 assert_eq!(issues.len(), 1);
791 assert!(issues[0].message.contains("unknown node type"));
792 }
793
794 #[test]
795 fn unregister_node_type_rejects_builtin_types() {
796 let engine = FlowEngine::new(NodeRegistry::with_defaults());
797
798 let err = engine.unregister_node_type("noop").unwrap_err();
799 assert!(matches!(err, FlowError::ProtectedNodeType(ref ty) if ty == "noop"));
800 assert!(engine.node_types().contains(&"noop".to_string()));
801 }
802
803 #[test]
804 fn capabilities_include_node_catalog() {
805 let engine = FlowEngine::new(NodeRegistry::with_defaults());
806 let capabilities = engine.capabilities();
807 assert_eq!(capabilities.version, "2026-03-22");
808 assert!(capabilities.progressive_disclosure);
809 assert!(capabilities
810 .nodes
811 .iter()
812 .any(|node| node.node_type == "llm"));
813 }
814
815 #[test]
816 fn http_request_descriptor_carries_editor_metadata() {
817 let engine = FlowEngine::new(NodeRegistry::with_defaults());
818 let descriptors = engine.node_descriptors();
819 let http = descriptors
820 .iter()
821 .find(|descriptor| descriptor.node_type == "http-request")
822 .unwrap();
823 assert_eq!(http.default_data["method"], "GET");
824 assert!(http.fields.iter().any(|field| field.key == "url"));
825 }
826
827 #[tokio::test]
830 async fn start_returns_execution_id() {
831 let engine = FlowEngine::new(NodeRegistry::with_defaults());
832 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
833 assert!(!id.is_nil());
835 }
836
837 #[tokio::test]
838 async fn start_rejects_invalid_definition() {
839 let engine = FlowEngine::new(NodeRegistry::with_defaults());
840 let bad = json!({
841 "nodes": [{ "id": "a", "type": "noop" }],
842 "edges": [{ "source": "ghost", "target": "a" }]
843 });
844 assert!(matches!(
845 engine.start(&bad, HashMap::new()).await,
846 Err(FlowError::UnknownNode(_))
847 ));
848 }
849
850 #[tokio::test]
851 async fn completed_flow_has_outputs() {
852 let engine = FlowEngine::new(NodeRegistry::with_defaults());
853 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
854
855 tokio::time::sleep(Duration::from_millis(50)).await;
857
858 let state = engine.state(id).await.unwrap();
859 if let ExecutionState::Completed(result) = state {
860 assert!(result.outputs.contains_key("a"));
861 assert!(result.outputs.contains_key("b"));
862 } else {
863 panic!("expected Completed, got {}", state.as_str());
864 }
865 }
866
867 #[tokio::test]
870 async fn state_returns_not_found_for_unknown_id() {
871 let engine = FlowEngine::new(NodeRegistry::with_defaults());
872 let err = engine.state(Uuid::new_v4()).await.unwrap_err();
873 assert!(matches!(err, FlowError::ExecutionNotFound(_)));
874 }
875
876 #[tokio::test]
879 async fn pause_transitions_to_paused() {
880 let engine = slow_engine(Duration::from_millis(200));
881 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
882
883 tokio::time::sleep(Duration::from_millis(10)).await;
885 engine.pause(id).await.unwrap();
886
887 assert!(matches!(
888 engine.state(id).await.unwrap(),
889 ExecutionState::Paused
890 ));
891 }
892
893 #[tokio::test]
894 async fn resume_transitions_to_running() {
895 let engine = slow_engine(Duration::from_millis(200));
896 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
897
898 tokio::time::sleep(Duration::from_millis(10)).await;
899 engine.pause(id).await.unwrap();
900 engine.resume(id).await.unwrap();
901
902 assert!(matches!(
903 engine.state(id).await.unwrap(),
904 ExecutionState::Running
905 ));
906 }
907
908 #[tokio::test]
909 async fn pause_on_completed_flow_returns_invalid_transition() {
910 let engine = FlowEngine::new(NodeRegistry::with_defaults());
911 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
912
913 tokio::time::sleep(Duration::from_millis(50)).await;
914 let err = engine.pause(id).await.unwrap_err();
916 assert!(matches!(err, FlowError::InvalidTransition { .. }));
917 }
918
919 #[tokio::test]
920 async fn resume_on_running_flow_returns_invalid_transition() {
921 let engine = slow_engine(Duration::from_millis(200));
922 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
923
924 tokio::time::sleep(Duration::from_millis(10)).await;
925 let err = engine.resume(id).await.unwrap_err();
927 assert!(matches!(err, FlowError::InvalidTransition { .. }));
928
929 engine.terminate(id).await.unwrap();
930 }
931
932 #[tokio::test]
935 async fn terminate_stops_slow_execution() {
936 let engine = slow_engine(Duration::from_millis(500));
937 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
938
939 tokio::time::sleep(Duration::from_millis(10)).await;
940 engine.terminate(id).await.unwrap();
941
942 tokio::time::sleep(Duration::from_millis(50)).await;
944
945 assert!(matches!(
946 engine.state(id).await.unwrap(),
947 ExecutionState::Terminated
948 ));
949 }
950
951 #[tokio::test]
952 async fn terminate_unblocks_paused_execution() {
953 let engine = slow_engine(Duration::from_millis(500));
954 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
955
956 tokio::time::sleep(Duration::from_millis(10)).await;
957 engine.pause(id).await.unwrap();
958
959 engine.terminate(id).await.unwrap();
961
962 tokio::time::sleep(Duration::from_millis(600)).await;
963
964 assert!(matches!(
965 engine.state(id).await.unwrap(),
966 ExecutionState::Terminated
967 ));
968 }
969
970 #[tokio::test]
971 async fn terminate_on_completed_flow_returns_invalid_transition() {
972 let engine = FlowEngine::new(NodeRegistry::with_defaults());
973 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
974
975 tokio::time::sleep(Duration::from_millis(50)).await;
976 let err = engine.terminate(id).await.unwrap_err();
977 assert!(matches!(err, FlowError::InvalidTransition { .. }));
978 }
979
980 #[tokio::test]
981 async fn unknown_execution_id_returns_not_found() {
982 let engine = FlowEngine::new(NodeRegistry::with_defaults());
983 let id = Uuid::new_v4();
984 assert!(matches!(
985 engine.pause(id).await,
986 Err(FlowError::ExecutionNotFound(_))
987 ));
988 assert!(matches!(
989 engine.resume(id).await,
990 Err(FlowError::ExecutionNotFound(_))
991 ));
992 assert!(matches!(
993 engine.terminate(id).await,
994 Err(FlowError::ExecutionNotFound(_))
995 ));
996 }
997
998 #[tokio::test]
1001 async fn execution_store_saves_completed_result() {
1002 use crate::store::MemoryExecutionStore;
1003
1004 let store = Arc::new(MemoryExecutionStore::new());
1005 let engine = FlowEngine::new(NodeRegistry::with_defaults())
1006 .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
1007
1008 let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
1009 tokio::time::sleep(Duration::from_millis(50)).await;
1010
1011 let ids = store.list().await.unwrap();
1013 assert!(ids.contains(&id), "stored execution id not found");
1014
1015 let saved = store.load(id).await.unwrap().unwrap();
1016 assert_eq!(saved.execution_id, id);
1017 assert!(saved.outputs.contains_key("a"));
1018 assert!(saved.outputs.contains_key("b"));
1019 }
1020
1021 #[tokio::test]
1022 async fn execution_store_not_used_on_terminated_execution() {
1023 use crate::store::MemoryExecutionStore;
1024
1025 let store = Arc::new(MemoryExecutionStore::new());
1026 let engine = slow_engine(Duration::from_millis(500))
1027 .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
1028
1029 let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
1030 tokio::time::sleep(Duration::from_millis(10)).await;
1031 engine.terminate(id).await.unwrap();
1032 tokio::time::sleep(Duration::from_millis(50)).await;
1033
1034 assert!(
1036 store.list().await.unwrap().is_empty(),
1037 "terminated result should not be stored"
1038 );
1039 }
1040
1041 #[tokio::test]
1044 async fn engine_emitter_receives_flow_and_node_events() {
1045 use crate::event::EventEmitter;
1046 use std::sync::atomic::{AtomicU32, Ordering};
1047
1048 struct CountEmitter {
1049 flow_started: Arc<AtomicU32>,
1050 flow_completed: Arc<AtomicU32>,
1051 node_started: Arc<AtomicU32>,
1052 node_completed: Arc<AtomicU32>,
1053 node_skipped: Arc<AtomicU32>,
1054 node_failed: Arc<AtomicU32>,
1055 node_completed_full: Arc<AtomicU32>,
1056 iteration_started: Arc<AtomicU32>,
1057 iteration_next: Arc<AtomicU32>,
1058 iteration_completed: Arc<AtomicU32>,
1059 loop_started: Arc<AtomicU32>,
1060 loop_completed: Arc<AtomicU32>,
1061 parallel_branch_started: Arc<AtomicU32>,
1062 parallel_branch_completed: Arc<AtomicU32>,
1063 node_retry: Arc<AtomicU32>,
1064 }
1065
1066 #[async_trait::async_trait]
1067 impl EventEmitter for CountEmitter {
1068 async fn on_flow_started(&self, _: Uuid) {
1069 self.flow_started.fetch_add(1, Ordering::SeqCst);
1070 }
1071 async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {
1072 self.flow_completed.fetch_add(1, Ordering::SeqCst);
1073 }
1074 async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1075 async fn on_flow_terminated(&self, _: Uuid) {}
1076 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
1077 self.node_started.fetch_add(1, Ordering::SeqCst);
1078 }
1079 async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {
1080 self.node_completed.fetch_add(1, Ordering::SeqCst);
1081 }
1082 async fn on_node_skipped(&self, _: Uuid, _: &str) {
1083 self.node_skipped.fetch_add(1, Ordering::SeqCst);
1084 }
1085 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {
1086 self.node_failed.fetch_add(1, Ordering::SeqCst);
1087 }
1088 async fn on_node_completed_full(
1089 &self,
1090 _: Uuid,
1091 _: &str,
1092 _: &str,
1093 _: &serde_json::Value,
1094 _: Option<&serde_json::Value>,
1095 _: &serde_json::Value,
1096 _: u64,
1097 ) {
1098 self.node_completed_full.fetch_add(1, Ordering::SeqCst);
1099 }
1100 async fn on_iteration_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1101 async fn on_iteration_next(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1102 async fn on_iteration_completed(&self, _: Uuid, _: &str, _: &str) {}
1103 async fn on_loop_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1104 async fn on_loop_completed(&self, _: Uuid, _: &str, _: &str) {}
1105 async fn on_parallel_branch_started(&self, _: Uuid, _: &str, _: &str, _: &str) {}
1106 async fn on_parallel_branch_completed(
1107 &self,
1108 _: Uuid,
1109 _: &str,
1110 _: &str,
1111 _: &str,
1112 _: &serde_json::Value,
1113 ) {
1114 }
1115 async fn on_node_retry(&self, _: Uuid, _: &str, _: u32, _: u32) {}
1116 }
1117
1118 let flow_started = Arc::new(AtomicU32::new(0));
1119 let flow_completed = Arc::new(AtomicU32::new(0));
1120 let node_started = Arc::new(AtomicU32::new(0));
1121 let node_completed = Arc::new(AtomicU32::new(0));
1122 let node_skipped = Arc::new(AtomicU32::new(0));
1123 let node_failed = Arc::new(AtomicU32::new(0));
1124 let node_completed_full = Arc::new(AtomicU32::new(0));
1125 let iteration_started = Arc::new(AtomicU32::new(0));
1126 let iteration_next = Arc::new(AtomicU32::new(0));
1127 let iteration_completed = Arc::new(AtomicU32::new(0));
1128 let loop_started = Arc::new(AtomicU32::new(0));
1129 let loop_completed = Arc::new(AtomicU32::new(0));
1130 let parallel_branch_started = Arc::new(AtomicU32::new(0));
1131 let parallel_branch_completed = Arc::new(AtomicU32::new(0));
1132 let node_retry = Arc::new(AtomicU32::new(0));
1133
1134 let emitter = Arc::new(CountEmitter {
1135 flow_started: Arc::clone(&flow_started),
1136 flow_completed: Arc::clone(&flow_completed),
1137 node_started: Arc::clone(&node_started),
1138 node_completed: Arc::clone(&node_completed),
1139 node_skipped: Arc::clone(&node_skipped),
1140 node_failed: Arc::clone(&node_failed),
1141 node_completed_full: Arc::clone(&node_completed_full),
1142 iteration_started: Arc::clone(&iteration_started),
1143 iteration_next: Arc::clone(&iteration_next),
1144 iteration_completed: Arc::clone(&iteration_completed),
1145 loop_started: Arc::clone(&loop_started),
1146 loop_completed: Arc::clone(&loop_completed),
1147 parallel_branch_started: Arc::clone(¶llel_branch_started),
1148 parallel_branch_completed: Arc::clone(¶llel_branch_completed),
1149 node_retry: Arc::clone(&node_retry),
1150 });
1151
1152 let engine = FlowEngine::new(NodeRegistry::with_defaults())
1153 .with_event_emitter(emitter as Arc<dyn EventEmitter>);
1154
1155 engine.start(&simple_def(), HashMap::new()).await.unwrap();
1157 tokio::time::sleep(Duration::from_millis(50)).await;
1158
1159 assert_eq!(flow_started.load(Ordering::SeqCst), 1, "flow_started");
1160 assert_eq!(flow_completed.load(Ordering::SeqCst), 1, "flow_completed");
1161 assert_eq!(node_started.load(Ordering::SeqCst), 2, "node_started (a+b)");
1162 assert_eq!(
1163 node_completed.load(Ordering::SeqCst),
1164 2,
1165 "node_completed (a+b)"
1166 );
1167 }
1168
1169 #[tokio::test]
1172 async fn start_named_loads_and_runs_from_flow_store() {
1173 use crate::flow_store::MemoryFlowStore;
1174
1175 let flow_store = Arc::new(MemoryFlowStore::new());
1176 flow_store.save("greet", &simple_def()).await.unwrap();
1177
1178 let engine = FlowEngine::new(NodeRegistry::with_defaults())
1179 .with_flow_store(Arc::clone(&flow_store) as Arc<dyn crate::flow_store::FlowStore>);
1180
1181 let id = engine.start_named("greet", HashMap::new()).await.unwrap();
1182 assert!(!id.is_nil());
1183
1184 tokio::time::sleep(Duration::from_millis(50)).await;
1185 assert!(matches!(
1186 engine.state(id).await.unwrap(),
1187 ExecutionState::Completed(_)
1188 ));
1189 }
1190
1191 #[tokio::test]
1192 async fn start_named_returns_flow_not_found_for_unknown_name() {
1193 use crate::flow_store::MemoryFlowStore;
1194
1195 let engine = FlowEngine::new(NodeRegistry::with_defaults())
1196 .with_flow_store(
1197 Arc::new(MemoryFlowStore::new()) as Arc<dyn crate::flow_store::FlowStore>
1198 );
1199
1200 let err = engine
1201 .start_named("nonexistent", HashMap::new())
1202 .await
1203 .unwrap_err();
1204
1205 assert!(
1206 matches!(err, FlowError::FlowNotFound(ref n) if n == "nonexistent"),
1207 "expected FlowNotFound, got: {err}"
1208 );
1209 }
1210
1211 #[tokio::test]
1212 async fn start_named_returns_internal_when_no_store_configured() {
1213 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1214
1215 let err = engine
1216 .start_named("anything", HashMap::new())
1217 .await
1218 .unwrap_err();
1219
1220 assert!(
1221 matches!(err, FlowError::Internal(_)),
1222 "expected Internal, got: {err}"
1223 );
1224 }
1225
1226 #[tokio::test]
1229 async fn start_streaming_delivers_flow_started_and_completed_events() {
1230 use crate::event::FlowEvent;
1231
1232 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1233 let (_, mut rx) = engine
1234 .start_streaming(&simple_def(), HashMap::new())
1235 .await
1236 .unwrap();
1237
1238 let mut saw_started = false;
1239 let mut saw_completed = false;
1240
1241 loop {
1242 match rx.recv().await {
1243 Ok(FlowEvent::FlowStarted { .. }) => saw_started = true,
1244 Ok(FlowEvent::FlowCompleted { .. }) => {
1245 saw_completed = true;
1246 break;
1247 }
1248 Ok(_) => {}
1249 Err(_) => break,
1250 }
1251 }
1252
1253 assert!(saw_started, "FlowStarted not received");
1254 assert!(saw_completed, "FlowCompleted not received");
1255 }
1256
1257 #[tokio::test]
1258 async fn start_streaming_delivers_node_events_for_each_node() {
1259 use crate::event::FlowEvent;
1260 use std::collections::HashSet;
1261
1262 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1263 let (_, mut rx) = engine
1264 .start_streaming(&simple_def(), HashMap::new())
1265 .await
1266 .unwrap();
1267
1268 let mut completed_nodes: HashSet<String> = HashSet::new();
1269
1270 loop {
1271 match rx.recv().await {
1272 Ok(FlowEvent::NodeCompleted { node_id, .. }) => {
1273 completed_nodes.insert(node_id);
1274 }
1275 Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1276 Ok(_) => {}
1277 }
1278 }
1279
1280 assert!(completed_nodes.contains("a"), "node 'a' not in stream");
1281 assert!(completed_nodes.contains("b"), "node 'b' not in stream");
1282 }
1283
1284 #[tokio::test]
1285 async fn start_streaming_zero_events_lost_on_fast_flow() {
1286 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1289 let def = json!({ "nodes": [{ "id": "x", "type": "noop" }], "edges": [] });
1290
1291 let (_id, mut rx) = engine.start_streaming(&def, HashMap::new()).await.unwrap();
1292
1293 let mut event_count = 0u32;
1294 loop {
1295 match rx.recv().await {
1296 Ok(_) => event_count += 1,
1297 Err(_) => break,
1298 }
1299 }
1300 assert!(event_count >= 4, "expected ≥4 events, got {event_count}");
1302 }
1303
1304 #[tokio::test]
1305 async fn start_streaming_existing_emitter_also_fires() {
1306 use crate::event::{EventEmitter, FlowEvent};
1307 use std::sync::atomic::{AtomicU32, Ordering};
1308
1309 struct CountEmitter(Arc<AtomicU32>);
1310
1311 #[async_trait::async_trait]
1312 impl EventEmitter for CountEmitter {
1313 async fn on_flow_started(&self, _: Uuid) {
1314 self.0.fetch_add(1, Ordering::SeqCst);
1315 }
1316 async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {}
1317 async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1318 async fn on_flow_terminated(&self, _: Uuid) {}
1319 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
1320 async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {}
1321 async fn on_node_skipped(&self, _: Uuid, _: &str) {}
1322 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
1323 async fn on_node_completed_full(
1324 &self,
1325 _: Uuid,
1326 _: &str,
1327 _: &str,
1328 _: &serde_json::Value,
1329 _: Option<&serde_json::Value>,
1330 _: &serde_json::Value,
1331 _: u64,
1332 ) {
1333 }
1334 async fn on_iteration_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1335 async fn on_iteration_next(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1336 async fn on_iteration_completed(&self, _: Uuid, _: &str, _: &str) {}
1337 async fn on_loop_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1338 async fn on_loop_completed(&self, _: Uuid, _: &str, _: &str) {}
1339 async fn on_parallel_branch_started(&self, _: Uuid, _: &str, _: &str, _: &str) {}
1340 async fn on_parallel_branch_completed(
1341 &self,
1342 _: Uuid,
1343 _: &str,
1344 _: &str,
1345 _: &str,
1346 _: &serde_json::Value,
1347 ) {
1348 }
1349 async fn on_node_retry(&self, _: Uuid, _: &str, _: u32, _: u32) {}
1350 }
1351
1352 let counter = Arc::new(AtomicU32::new(0));
1353 let engine = FlowEngine::new(NodeRegistry::with_defaults())
1354 .with_event_emitter(
1355 Arc::new(CountEmitter(Arc::clone(&counter))) as Arc<dyn EventEmitter>
1356 );
1357
1358 let (_id, mut rx) = engine
1359 .start_streaming(&simple_def(), HashMap::new())
1360 .await
1361 .unwrap();
1362
1363 loop {
1365 match rx.recv().await {
1366 Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1367 Ok(_) => {}
1368 }
1369 }
1370
1371 assert_eq!(
1373 counter.load(Ordering::SeqCst),
1374 1,
1375 "existing emitter did not fire"
1376 );
1377 }
1378
1379 #[test]
1382 fn validate_returns_empty_for_valid_flow() {
1383 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1384 let def = json!({
1385 "nodes": [
1386 { "id": "a", "type": "noop" },
1387 { "id": "b", "type": "noop" }
1388 ],
1389 "edges": [{ "source": "a", "target": "b" }]
1390 });
1391 assert!(engine.validate(&def).is_empty());
1392 }
1393
1394 #[test]
1395 fn validate_catches_unknown_node_type() {
1396 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1397 let def = json!({
1398 "nodes": [
1399 { "id": "a", "type": "noop" },
1400 { "id": "b", "type": "does-not-exist" }
1401 ],
1402 "edges": []
1403 });
1404 let issues = engine.validate(&def);
1405 assert_eq!(issues.len(), 1);
1406 assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1407 assert!(issues[0].message.contains("unknown node type"));
1408 }
1409
1410 #[test]
1411 fn validate_catches_cyclic_graph() {
1412 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1413 let def = json!({
1414 "nodes": [
1415 { "id": "a", "type": "noop" },
1416 { "id": "b", "type": "noop" }
1417 ],
1418 "edges": [
1419 { "source": "a", "target": "b" },
1420 { "source": "b", "target": "a" }
1421 ]
1422 });
1423 let issues = engine.validate(&def);
1424 assert_eq!(issues.len(), 1);
1425 assert!(issues[0].node_id.is_none());
1426 }
1427
1428 #[test]
1429 fn validate_catches_run_if_referencing_unknown_node() {
1430 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1431 let def = json!({
1432 "nodes": [
1433 { "id": "a", "type": "noop" },
1434 {
1435 "id": "b",
1436 "type": "noop",
1437 "data": {
1438 "run_if": { "from": "ghost", "path": "", "op": "eq", "value": true }
1439 }
1440 }
1441 ],
1442 "edges": [{ "source": "a", "target": "b" }]
1443 });
1444 let issues = engine.validate(&def);
1445 assert_eq!(issues.len(), 1);
1446 assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1447 assert!(issues[0].message.contains("ghost"));
1448 }
1449
1450 #[test]
1451 fn validate_reports_multiple_issues() {
1452 let engine = FlowEngine::new(NodeRegistry::with_defaults());
1453 let def = json!({
1454 "nodes": [
1455 { "id": "a", "type": "bad-type-1" },
1456 { "id": "b", "type": "bad-type-2" }
1457 ],
1458 "edges": []
1459 });
1460 assert_eq!(engine.validate(&def).len(), 2);
1461 }
1462}