Skip to main content

a3s_flow/
engine.rs

1//! [`FlowEngine`] — the central API for managing workflow executions.
2//!
3//! `FlowEngine` wraps a [`NodeRegistry`] and an in-memory execution map.
4//! It is the recommended entry point for applications that need lifecycle
5//! control over running workflows.
6//!
7//! # Lifecycle
8//!
9//! ```text
10//!                    ┌──────────┐
11//!             start  │ Running  │ ←──────────── resume
12//!           ────────►│          │
13//!                    └────┬─────┘
14//!               pause │   │ node error / all nodes done / terminate
15//!                     ▼   ▼
16//!                 ┌────────────┐   ┌───────────┐   ┌────────────┐
17//!                 │   Paused   │   │ Completed │   │   Failed   │
18//!                 └────────────┘   └───────────┘   └────────────┘
19//!                                                  ┌────────────┐
20//!                                                  │ Terminated │
21//!                                                  └────────────┘
22//! ```
23//!
24//! Pause takes effect at the **next wave boundary** — the current wave of
25//! concurrently executing nodes runs to completion first. Terminate interrupts
26//! the engine between or within waves and is reflected as soon as the runner
27//! task observes the cancellation signal.
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use serde_json::Value;
33use tokio::sync::{watch, RwLock};
34use tokio_util::sync::CancellationToken;
35use tracing::warn;
36use uuid::Uuid;
37
38use tokio::sync::broadcast;
39
40use crate::error::{FlowError, Result};
41use crate::event::{ChannelEmitter, EventEmitter, FlowEvent, MulticastEmitter, NoopEventEmitter};
42use crate::execution::{ExecutionHandle, ExecutionState};
43use crate::flow_store::FlowStore;
44use crate::graph::DagGraph;
45use crate::registry::NodeRegistry;
46use crate::runner::{FlowRunner, FlowSignal};
47use crate::store::ExecutionStore;
48use crate::validation::ValidationIssue;
49
50/// Central entry point for managing workflow executions.
51///
52/// # Example
53///
54/// ```rust,no_run
55/// use a3s_flow::{FlowEngine, NodeRegistry};
56/// use serde_json::json;
57/// use std::collections::HashMap;
58///
59/// #[tokio::main]
60/// async fn main() -> a3s_flow::Result<()> {
61///     let engine = FlowEngine::new(NodeRegistry::with_defaults());
62///
63///     // Query available node types.
64///     println!("node types: {:?}", engine.node_types());
65///
66///     // Start a workflow and get its execution ID.
67///     let definition = json!({
68///         "nodes": [
69///             { "id": "a", "type": "noop" },
70///             { "id": "b", "type": "noop" }
71///         ],
72///         "edges": [{ "source": "a", "target": "b" }]
73///     });
74///     let id = engine.start(&definition, HashMap::new()).await?;
75///
76///     // Inspect state, pause, resume, or terminate.
77///     println!("state: {:?}", engine.state(id).await?);
78///     Ok(())
79/// }
80/// ```
81pub struct FlowEngine {
82    registry: Arc<NodeRegistry>,
83    executions: Arc<RwLock<HashMap<Uuid, ExecutionHandle>>>,
84    /// Optional store — when set, completed results are automatically persisted.
85    execution_store: Option<Arc<dyn ExecutionStore>>,
86    /// Optional store — enables `start_named` to look up definitions by name.
87    flow_store: Option<Arc<dyn FlowStore>>,
88    /// Emitter passed to each runner; receives all node and flow lifecycle events.
89    emitter: Arc<dyn EventEmitter>,
90    /// When set, passed to every runner to cap per-wave node concurrency.
91    max_concurrency: Option<usize>,
92}
93
94impl FlowEngine {
95    /// Create a new engine with the given node registry.
96    ///
97    /// Uses [`NoopEventEmitter`] and no execution store by default. Use the
98    /// builder methods [`with_execution_store`](Self::with_execution_store) and
99    /// [`with_event_emitter`](Self::with_event_emitter) to customise behaviour.
100    pub fn new(registry: NodeRegistry) -> Self {
101        Self {
102            registry: Arc::new(registry),
103            executions: Arc::new(RwLock::new(HashMap::new())),
104            execution_store: None,
105            flow_store: None,
106            emitter: Arc::new(NoopEventEmitter),
107            max_concurrency: None,
108        }
109    }
110
111    /// Attach an execution store.
112    ///
113    /// When set, every successfully completed execution result is saved to the
114    /// store automatically. Returns `self` for method chaining.
115    pub fn with_execution_store(mut self, store: Arc<dyn ExecutionStore>) -> Self {
116        self.execution_store = Some(store);
117        self
118    }
119
120    /// Attach a flow definition store.
121    ///
122    /// Required for [`start_named`](Self::start_named). Allows any backend
123    /// (in-memory, SQLite, remote API, …) by implementing [`FlowStore`].
124    /// Returns `self` for method chaining.
125    pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
126        self.flow_store = Some(store);
127        self
128    }
129
130    /// Attach a custom event emitter.
131    ///
132    /// The emitter is passed to every runner created by this engine and
133    /// receives all node and flow lifecycle events. Returns `self` for chaining.
134    pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
135        self.emitter = emitter;
136        self
137    }
138
139    /// Limit the number of nodes that may execute concurrently within a single
140    /// wave across all executions started by this engine.
141    ///
142    /// Delegates to [`FlowRunner::with_max_concurrency`]. Returns `self` for chaining.
143    pub fn with_max_concurrency(mut self, n: usize) -> Self {
144        self.max_concurrency = Some(n);
145        self
146    }
147
148    // ── Node type discovery ────────────────────────────────────────────────
149
150    /// Return all registered node type strings, sorted alphabetically.
151    ///
152    /// Includes built-in types (e.g. `"noop"`) and any types registered via
153    /// [`NodeRegistry::register`].
154    pub fn node_types(&self) -> Vec<String> {
155        self.registry.list_types()
156    }
157
158    // ── Pre-flight validation ──────────────────────────────────────────────
159
160    /// Validate a flow definition without executing it.
161    ///
162    /// Returns a list of [`ValidationIssue`]s describing structural problems.
163    /// An empty list means the definition is valid and ready to run.
164    ///
165    /// The following checks are performed:
166    /// - DAG structural validity: no cycles, no unknown edge references,
167    ///   no duplicate node IDs, at least one node.
168    /// - All node types are registered in the engine's [`NodeRegistry`].
169    /// - Every `run_if.from` field references an existing node ID.
170    ///
171    /// ```rust
172    /// use a3s_flow::{FlowEngine, NodeRegistry};
173    /// use serde_json::json;
174    ///
175    /// let engine = FlowEngine::new(NodeRegistry::with_defaults());
176    /// let def = json!({
177    ///     "nodes": [
178    ///         { "id": "a", "type": "noop" },
179    ///         { "id": "b", "type": "unknown-type" }
180    ///     ],
181    ///     "edges": []
182    /// });
183    /// let issues = engine.validate(&def);
184    /// assert_eq!(issues.len(), 1);
185    /// assert!(issues[0].message.contains("unknown node type"));
186    /// ```
187    pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue> {
188        let mut issues = Vec::new();
189
190        // Parse the DAG — catches cycle, unknown refs, duplicate IDs, empty flow.
191        let dag = match DagGraph::from_json(definition) {
192            Ok(dag) => dag,
193            Err(e) => {
194                issues.push(ValidationIssue {
195                    node_id: None,
196                    message: e.to_string(),
197                });
198                return issues;
199            }
200        };
201
202        for node_def in dag.nodes_in_order() {
203            // Check node type is registered.
204            if self.registry.get(&node_def.node_type).is_err() {
205                issues.push(ValidationIssue {
206                    node_id: Some(node_def.id.clone()),
207                    message: format!("unknown node type '{}'", node_def.node_type),
208                });
209            }
210
211            // Check run_if.from references an existing node.
212            if let Some(ref cond) = node_def.run_if {
213                if !dag.nodes.contains_key(&cond.from) {
214                    issues.push(ValidationIssue {
215                        node_id: Some(node_def.id.clone()),
216                        message: format!("run_if references unknown node '{}'", cond.from),
217                    });
218                }
219            }
220        }
221
222        issues
223    }
224
225    // ── Execution lifecycle ────────────────────────────────────────────────
226
227    /// Start a new workflow execution from a JSON DAG definition.
228    ///
229    /// The definition is parsed and validated synchronously. If valid, the
230    /// execution is launched in a background Tokio task and the execution ID
231    /// is returned immediately — the flow runs concurrently with the caller.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the definition is invalid (cycle, unknown node ID,
236    /// bad JSON, unregistered node type).
237    pub async fn start(
238        &self,
239        definition: &Value,
240        variables: HashMap<String, Value>,
241    ) -> Result<Uuid> {
242        self.start_inner(definition, variables, Arc::clone(&self.emitter))
243            .await
244    }
245
246    /// Start a workflow and return a live event stream alongside the execution ID.
247    ///
248    /// The returned [`broadcast::Receiver<FlowEvent>`] is created **before** the
249    /// execution task is spawned, guaranteeing that no events are missed —
250    /// including `FlowStarted`. Multiple subscribers can be created by calling
251    /// [`broadcast::Receiver::resubscribe`].
252    ///
253    /// The stream closes (returns `Err(RecvError::Closed)`) when the execution
254    /// reaches a terminal state (`Completed`, `Failed`, or `Terminated`).
255    ///
256    /// If the engine also has a custom [`EventEmitter`] configured via
257    /// [`with_event_emitter`](Self::with_event_emitter), both the emitter and
258    /// the broadcast channel receive every event.
259    ///
260    /// # Example
261    ///
262    /// ```rust,no_run
263    /// use a3s_flow::{FlowEngine, FlowEvent, NodeRegistry};
264    /// use serde_json::json;
265    /// use std::collections::HashMap;
266    ///
267    /// #[tokio::main]
268    /// async fn main() -> a3s_flow::Result<()> {
269    ///     let engine = FlowEngine::new(NodeRegistry::with_defaults());
270    ///     let def = json!({
271    ///         "nodes": [{ "id": "a", "type": "noop" }],
272    ///         "edges": []
273    ///     });
274    ///
275    ///     let (id, mut rx) = engine.start_streaming(&def, HashMap::new()).await?;
276    ///
277    ///     while let Ok(event) = rx.recv().await {
278    ///         match event {
279    ///             FlowEvent::NodeCompleted { node_id, .. } => println!("done: {node_id}"),
280    ///             FlowEvent::FlowCompleted { .. } => break,
281    ///             _ => {}
282    ///         }
283    ///     }
284    ///     Ok(())
285    /// }
286    /// ```
287    pub async fn start_streaming(
288        &self,
289        definition: &Value,
290        variables: HashMap<String, Value>,
291    ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
292        let (tx, rx) = broadcast::channel(256);
293        let channel_emitter = Arc::new(ChannelEmitter::new(tx));
294        let multicast: Arc<dyn EventEmitter> = Arc::new(MulticastEmitter {
295            a: Arc::clone(&self.emitter),
296            b: channel_emitter,
297        });
298        let id = self.start_inner(definition, variables, multicast).await?;
299        Ok((id, rx))
300    }
301
302    /// Internal start implementation; accepts an explicit emitter so that both
303    /// `start` and `start_streaming` can share the same spawn logic.
304    async fn start_inner(
305        &self,
306        definition: &Value,
307        variables: HashMap<String, Value>,
308        emitter: Arc<dyn EventEmitter>,
309    ) -> Result<Uuid> {
310        let dag = DagGraph::from_json(definition)?;
311        let mut runner = FlowRunner::new(dag, (*self.registry).clone()).with_event_emitter(emitter);
312        if let Some(ref fs) = self.flow_store {
313            runner = runner.with_flow_store(Arc::clone(fs));
314        }
315        if let Some(n) = self.max_concurrency {
316            runner = runner.with_max_concurrency(n);
317        }
318
319        let execution_id = Uuid::new_v4();
320        let cancel = CancellationToken::new();
321        let (signal_tx, signal_rx) = watch::channel(FlowSignal::Run);
322        let state = Arc::new(RwLock::new(ExecutionState::Running));
323
324        let handle = ExecutionHandle {
325            state: Arc::clone(&state),
326            signal_tx,
327            cancel: cancel.clone(),
328        };
329
330        self.executions.write().await.insert(execution_id, handle);
331
332        // Spawn the execution task; it updates state on terminal transitions.
333        // Logging and event emission are handled by FlowRunner::run_seeded.
334        let state_for_task = Arc::clone(&state);
335        let execution_store = self.execution_store.clone();
336        tokio::spawn(async move {
337            match runner
338                .run_controlled(execution_id, variables, signal_rx, cancel)
339                .await
340            {
341                Ok(result) => {
342                    // Persist the result if a store is configured.
343                    if let Some(ref store) = execution_store {
344                        if let Err(e) = store.save(&result).await {
345                            warn!(%execution_id, error = %e, "failed to persist execution result");
346                        }
347                    }
348                    *state_for_task.write().await = ExecutionState::Completed(result);
349                }
350                Err(FlowError::Terminated) => {
351                    *state_for_task.write().await = ExecutionState::Terminated;
352                }
353                Err(e) => {
354                    *state_for_task.write().await = ExecutionState::Failed(e.to_string());
355                }
356            }
357        });
358
359        Ok(execution_id)
360    }
361
362    /// Start a workflow by loading its definition from the configured
363    /// [`FlowStore`] by name.
364    ///
365    /// Equivalent to:
366    /// ```rust,ignore
367    /// let def = flow_store.load(name).await?.ok_or(...)?;
368    /// engine.start(&def, variables).await
369    /// ```
370    ///
371    /// # Errors
372    ///
373    /// - [`FlowError::Internal`] if no `FlowStore` was configured via
374    ///   [`with_flow_store`](Self::with_flow_store).
375    /// - [`FlowError::FlowNotFound`] if no definition exists under `name`.
376    /// - Any error returned by [`start`](Self::start) (invalid definition, etc.).
377    pub async fn start_named(&self, name: &str, variables: HashMap<String, Value>) -> Result<Uuid> {
378        let store = self.flow_store.as_ref().ok_or_else(|| {
379            FlowError::Internal("no FlowStore configured; call with_flow_store first".into())
380        })?;
381
382        let definition = store
383            .load(name)
384            .await?
385            .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
386
387        self.start(&definition, variables).await
388    }
389
390    /// Pause a running execution at the next wave boundary.
391    ///
392    /// Nodes in the **current wave** continue until they finish. No new wave
393    /// starts until [`resume`](Self::resume) is called.
394    ///
395    /// # Errors
396    ///
397    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
398    /// - [`FlowError::InvalidTransition`] if the execution is not `Running`.
399    pub async fn pause(&self, id: Uuid) -> Result<()> {
400        let executions = self.executions.read().await;
401        let handle = executions
402            .get(&id)
403            .ok_or(FlowError::ExecutionNotFound(id))?;
404
405        let mut state = handle.state.write().await;
406        match *state {
407            ExecutionState::Running => {
408                handle.signal_tx.send(FlowSignal::Pause).ok();
409                *state = ExecutionState::Paused;
410                Ok(())
411            }
412            ref s => Err(FlowError::InvalidTransition {
413                action: "pause".into(),
414                from: s.as_str().into(),
415            }),
416        }
417    }
418
419    /// Resume a paused execution.
420    ///
421    /// # Errors
422    ///
423    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
424    /// - [`FlowError::InvalidTransition`] if the execution is not `Paused`.
425    pub async fn resume(&self, id: Uuid) -> Result<()> {
426        let executions = self.executions.read().await;
427        let handle = executions
428            .get(&id)
429            .ok_or(FlowError::ExecutionNotFound(id))?;
430
431        let mut state = handle.state.write().await;
432        match *state {
433            ExecutionState::Paused => {
434                handle.signal_tx.send(FlowSignal::Run).ok();
435                *state = ExecutionState::Running;
436                Ok(())
437            }
438            ref s => Err(FlowError::InvalidTransition {
439                action: "resume".into(),
440                from: s.as_str().into(),
441            }),
442        }
443    }
444
445    /// Terminate an execution immediately.
446    ///
447    /// Sends a cancellation signal. The execution task stops at the next
448    /// cancellation checkpoint (between waves, or within a wave's result
449    /// collection). If the execution is currently paused it is unblocked so
450    /// it can observe the cancellation.
451    ///
452    /// # Errors
453    ///
454    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
455    /// - [`FlowError::InvalidTransition`] if the execution is already in a
456    ///   terminal state (`Completed`, `Failed`, `Terminated`).
457    pub async fn terminate(&self, id: Uuid) -> Result<()> {
458        let executions = self.executions.read().await;
459        let handle = executions
460            .get(&id)
461            .ok_or(FlowError::ExecutionNotFound(id))?;
462
463        let state = handle.state.read().await;
464        if state.is_terminal() {
465            return Err(FlowError::InvalidTransition {
466                action: "terminate".into(),
467                from: state.as_str().into(),
468            });
469        }
470        drop(state);
471
472        handle.cancel.cancel();
473        // Unblock a paused runner so it can observe the cancellation.
474        handle.signal_tx.send(FlowSignal::Run).ok();
475        Ok(())
476    }
477
478    /// Return a snapshot of the current state of an execution.
479    ///
480    /// # Errors
481    ///
482    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
483    pub async fn state(&self, id: Uuid) -> Result<ExecutionState> {
484        let executions = self.executions.read().await;
485        let handle = executions
486            .get(&id)
487            .ok_or(FlowError::ExecutionNotFound(id))?;
488        // Clone while the guard is still held, then drop the guard before returning.
489        let snapshot = handle.state.read().await.clone();
490        Ok(snapshot)
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::node::{ExecContext, Node};
498    use async_trait::async_trait;
499    use serde_json::{json, Value};
500    use std::time::Duration;
501
502    // ── Helpers ────────────────────────────────────────────────────────────
503
504    /// A node that sleeps for the given duration before returning.
505    struct SlowNode(Duration);
506
507    #[async_trait]
508    impl Node for SlowNode {
509        fn node_type(&self) -> &str {
510            "slow"
511        }
512
513        async fn execute(&self, _ctx: ExecContext) -> crate::error::Result<Value> {
514            tokio::time::sleep(self.0).await;
515            Ok(json!({}))
516        }
517    }
518
519    fn slow_engine(delay: Duration) -> FlowEngine {
520        let mut registry = NodeRegistry::with_defaults();
521        registry.register(Arc::new(SlowNode(delay)));
522        FlowEngine::new(registry)
523    }
524
525    fn simple_def() -> Value {
526        json!({
527            "nodes": [
528                { "id": "a", "type": "noop" },
529                { "id": "b", "type": "noop" }
530            ],
531            "edges": [{ "source": "a", "target": "b" }]
532        })
533    }
534
535    fn slow_def() -> Value {
536        json!({
537            "nodes": [
538                { "id": "a", "type": "slow" },
539                { "id": "b", "type": "slow" }
540            ],
541            "edges": [{ "source": "a", "target": "b" }]
542        })
543    }
544
545    // ── node_types ─────────────────────────────────────────────────────────
546
547    #[test]
548    fn node_types_includes_builtins() {
549        let engine = FlowEngine::new(NodeRegistry::with_defaults());
550        let types = engine.node_types();
551        assert!(types.contains(&"noop".to_string()));
552    }
553
554    #[test]
555    fn node_types_includes_custom_nodes() {
556        let mut registry = NodeRegistry::with_defaults();
557        registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
558        let engine = FlowEngine::new(registry);
559
560        let types = engine.node_types();
561        assert!(types.contains(&"noop".to_string()));
562        assert!(types.contains(&"slow".to_string()));
563    }
564
565    #[test]
566    fn node_types_is_sorted() {
567        let engine = FlowEngine::new(NodeRegistry::with_defaults());
568        let types = engine.node_types();
569        let mut sorted = types.clone();
570        sorted.sort();
571        assert_eq!(types, sorted);
572    }
573
574    // ── start ──────────────────────────────────────────────────────────────
575
576    #[tokio::test]
577    async fn start_returns_execution_id() {
578        let engine = FlowEngine::new(NodeRegistry::with_defaults());
579        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
580        // ID is non-nil.
581        assert!(!id.is_nil());
582    }
583
584    #[tokio::test]
585    async fn start_rejects_invalid_definition() {
586        let engine = FlowEngine::new(NodeRegistry::with_defaults());
587        let bad = json!({
588            "nodes": [{ "id": "a", "type": "noop" }],
589            "edges": [{ "source": "ghost", "target": "a" }]
590        });
591        assert!(matches!(
592            engine.start(&bad, HashMap::new()).await,
593            Err(FlowError::UnknownNode(_))
594        ));
595    }
596
597    #[tokio::test]
598    async fn completed_flow_has_outputs() {
599        let engine = FlowEngine::new(NodeRegistry::with_defaults());
600        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
601
602        // Wait for the background task to finish.
603        tokio::time::sleep(Duration::from_millis(50)).await;
604
605        let state = engine.state(id).await.unwrap();
606        if let ExecutionState::Completed(result) = state {
607            assert!(result.outputs.contains_key("a"));
608            assert!(result.outputs.contains_key("b"));
609        } else {
610            panic!("expected Completed, got {}", state.as_str());
611        }
612    }
613
614    // ── state ──────────────────────────────────────────────────────────────
615
616    #[tokio::test]
617    async fn state_returns_not_found_for_unknown_id() {
618        let engine = FlowEngine::new(NodeRegistry::with_defaults());
619        let err = engine.state(Uuid::new_v4()).await.unwrap_err();
620        assert!(matches!(err, FlowError::ExecutionNotFound(_)));
621    }
622
623    // ── pause / resume ─────────────────────────────────────────────────────
624
625    #[tokio::test]
626    async fn pause_transitions_to_paused() {
627        let engine = slow_engine(Duration::from_millis(200));
628        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
629
630        // Give the runner a moment to start wave 1.
631        tokio::time::sleep(Duration::from_millis(10)).await;
632        engine.pause(id).await.unwrap();
633
634        assert!(matches!(
635            engine.state(id).await.unwrap(),
636            ExecutionState::Paused
637        ));
638    }
639
640    #[tokio::test]
641    async fn resume_transitions_to_running() {
642        let engine = slow_engine(Duration::from_millis(200));
643        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
644
645        tokio::time::sleep(Duration::from_millis(10)).await;
646        engine.pause(id).await.unwrap();
647        engine.resume(id).await.unwrap();
648
649        assert!(matches!(
650            engine.state(id).await.unwrap(),
651            ExecutionState::Running
652        ));
653    }
654
655    #[tokio::test]
656    async fn pause_on_completed_flow_returns_invalid_transition() {
657        let engine = FlowEngine::new(NodeRegistry::with_defaults());
658        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
659
660        tokio::time::sleep(Duration::from_millis(50)).await;
661        // Flow should be completed by now.
662        let err = engine.pause(id).await.unwrap_err();
663        assert!(matches!(err, FlowError::InvalidTransition { .. }));
664    }
665
666    #[tokio::test]
667    async fn resume_on_running_flow_returns_invalid_transition() {
668        let engine = slow_engine(Duration::from_millis(200));
669        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
670
671        tokio::time::sleep(Duration::from_millis(10)).await;
672        // Still running — not paused.
673        let err = engine.resume(id).await.unwrap_err();
674        assert!(matches!(err, FlowError::InvalidTransition { .. }));
675
676        engine.terminate(id).await.unwrap();
677    }
678
679    // ── terminate ──────────────────────────────────────────────────────────
680
681    #[tokio::test]
682    async fn terminate_stops_slow_execution() {
683        let engine = slow_engine(Duration::from_millis(500));
684        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
685
686        tokio::time::sleep(Duration::from_millis(10)).await;
687        engine.terminate(id).await.unwrap();
688
689        // The runner task should observe the cancellation quickly.
690        tokio::time::sleep(Duration::from_millis(50)).await;
691
692        assert!(matches!(
693            engine.state(id).await.unwrap(),
694            ExecutionState::Terminated
695        ));
696    }
697
698    #[tokio::test]
699    async fn terminate_unblocks_paused_execution() {
700        let engine = slow_engine(Duration::from_millis(500));
701        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
702
703        tokio::time::sleep(Duration::from_millis(10)).await;
704        engine.pause(id).await.unwrap();
705
706        // Flow is paused — now terminate it.
707        engine.terminate(id).await.unwrap();
708
709        tokio::time::sleep(Duration::from_millis(600)).await;
710
711        assert!(matches!(
712            engine.state(id).await.unwrap(),
713            ExecutionState::Terminated
714        ));
715    }
716
717    #[tokio::test]
718    async fn terminate_on_completed_flow_returns_invalid_transition() {
719        let engine = FlowEngine::new(NodeRegistry::with_defaults());
720        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
721
722        tokio::time::sleep(Duration::from_millis(50)).await;
723        let err = engine.terminate(id).await.unwrap_err();
724        assert!(matches!(err, FlowError::InvalidTransition { .. }));
725    }
726
727    #[tokio::test]
728    async fn unknown_execution_id_returns_not_found() {
729        let engine = FlowEngine::new(NodeRegistry::with_defaults());
730        let id = Uuid::new_v4();
731        assert!(matches!(
732            engine.pause(id).await,
733            Err(FlowError::ExecutionNotFound(_))
734        ));
735        assert!(matches!(
736            engine.resume(id).await,
737            Err(FlowError::ExecutionNotFound(_))
738        ));
739        assert!(matches!(
740            engine.terminate(id).await,
741            Err(FlowError::ExecutionNotFound(_))
742        ));
743    }
744
745    // ── ExecutionStore integration ──────────────────────────────────────────
746
747    #[tokio::test]
748    async fn execution_store_saves_completed_result() {
749        use crate::store::MemoryExecutionStore;
750
751        let store = Arc::new(MemoryExecutionStore::new());
752        let engine = FlowEngine::new(NodeRegistry::with_defaults())
753            .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
754
755        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
756        tokio::time::sleep(Duration::from_millis(50)).await;
757
758        // Engine should have saved the result.
759        let ids = store.list().await.unwrap();
760        assert!(ids.contains(&id), "stored execution id not found");
761
762        let saved = store.load(id).await.unwrap().unwrap();
763        assert_eq!(saved.execution_id, id);
764        assert!(saved.outputs.contains_key("a"));
765        assert!(saved.outputs.contains_key("b"));
766    }
767
768    #[tokio::test]
769    async fn execution_store_not_used_on_terminated_execution() {
770        use crate::store::MemoryExecutionStore;
771
772        let store = Arc::new(MemoryExecutionStore::new());
773        let engine = slow_engine(Duration::from_millis(500))
774            .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
775
776        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
777        tokio::time::sleep(Duration::from_millis(10)).await;
778        engine.terminate(id).await.unwrap();
779        tokio::time::sleep(Duration::from_millis(50)).await;
780
781        // Terminated executions are not saved.
782        assert!(
783            store.list().await.unwrap().is_empty(),
784            "terminated result should not be stored"
785        );
786    }
787
788    // ── EventEmitter integration (via engine) ───────────────────────────────
789
790    #[tokio::test]
791    async fn engine_emitter_receives_flow_and_node_events() {
792        use crate::event::EventEmitter;
793        use std::sync::atomic::{AtomicU32, Ordering};
794
795        struct CountEmitter {
796            flow_started: Arc<AtomicU32>,
797            flow_completed: Arc<AtomicU32>,
798            node_started: Arc<AtomicU32>,
799            node_completed: Arc<AtomicU32>,
800        }
801
802        #[async_trait::async_trait]
803        impl EventEmitter for CountEmitter {
804            async fn on_flow_started(&self, _: Uuid) {
805                self.flow_started.fetch_add(1, Ordering::SeqCst);
806            }
807            async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {
808                self.flow_completed.fetch_add(1, Ordering::SeqCst);
809            }
810            async fn on_flow_failed(&self, _: Uuid, _: &str) {}
811            async fn on_flow_terminated(&self, _: Uuid) {}
812            async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
813                self.node_started.fetch_add(1, Ordering::SeqCst);
814            }
815            async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {
816                self.node_completed.fetch_add(1, Ordering::SeqCst);
817            }
818            async fn on_node_skipped(&self, _: Uuid, _: &str) {}
819            async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
820        }
821
822        let flow_started = Arc::new(AtomicU32::new(0));
823        let flow_completed = Arc::new(AtomicU32::new(0));
824        let node_started = Arc::new(AtomicU32::new(0));
825        let node_completed = Arc::new(AtomicU32::new(0));
826
827        let emitter = Arc::new(CountEmitter {
828            flow_started: Arc::clone(&flow_started),
829            flow_completed: Arc::clone(&flow_completed),
830            node_started: Arc::clone(&node_started),
831            node_completed: Arc::clone(&node_completed),
832        });
833
834        let engine = FlowEngine::new(NodeRegistry::with_defaults())
835            .with_event_emitter(emitter as Arc<dyn EventEmitter>);
836
837        // simple_def has nodes a and b.
838        engine.start(&simple_def(), HashMap::new()).await.unwrap();
839        tokio::time::sleep(Duration::from_millis(50)).await;
840
841        assert_eq!(flow_started.load(Ordering::SeqCst), 1, "flow_started");
842        assert_eq!(flow_completed.load(Ordering::SeqCst), 1, "flow_completed");
843        assert_eq!(node_started.load(Ordering::SeqCst), 2, "node_started (a+b)");
844        assert_eq!(
845            node_completed.load(Ordering::SeqCst),
846            2,
847            "node_completed (a+b)"
848        );
849    }
850
851    // ── start_named ────────────────────────────────────────────────────────
852
853    #[tokio::test]
854    async fn start_named_loads_and_runs_from_flow_store() {
855        use crate::flow_store::MemoryFlowStore;
856
857        let flow_store = Arc::new(MemoryFlowStore::new());
858        flow_store.save("greet", &simple_def()).await.unwrap();
859
860        let engine = FlowEngine::new(NodeRegistry::with_defaults())
861            .with_flow_store(Arc::clone(&flow_store) as Arc<dyn crate::flow_store::FlowStore>);
862
863        let id = engine.start_named("greet", HashMap::new()).await.unwrap();
864        assert!(!id.is_nil());
865
866        tokio::time::sleep(Duration::from_millis(50)).await;
867        assert!(matches!(
868            engine.state(id).await.unwrap(),
869            ExecutionState::Completed(_)
870        ));
871    }
872
873    #[tokio::test]
874    async fn start_named_returns_flow_not_found_for_unknown_name() {
875        use crate::flow_store::MemoryFlowStore;
876
877        let engine = FlowEngine::new(NodeRegistry::with_defaults())
878            .with_flow_store(
879                Arc::new(MemoryFlowStore::new()) as Arc<dyn crate::flow_store::FlowStore>
880            );
881
882        let err = engine
883            .start_named("nonexistent", HashMap::new())
884            .await
885            .unwrap_err();
886
887        assert!(
888            matches!(err, FlowError::FlowNotFound(ref n) if n == "nonexistent"),
889            "expected FlowNotFound, got: {err}"
890        );
891    }
892
893    #[tokio::test]
894    async fn start_named_returns_internal_when_no_store_configured() {
895        let engine = FlowEngine::new(NodeRegistry::with_defaults());
896
897        let err = engine
898            .start_named("anything", HashMap::new())
899            .await
900            .unwrap_err();
901
902        assert!(
903            matches!(err, FlowError::Internal(_)),
904            "expected Internal, got: {err}"
905        );
906    }
907
908    // ── start_streaming ────────────────────────────────────────────────────
909
910    #[tokio::test]
911    async fn start_streaming_delivers_flow_started_and_completed_events() {
912        use crate::event::FlowEvent;
913
914        let engine = FlowEngine::new(NodeRegistry::with_defaults());
915        let (_, mut rx) = engine
916            .start_streaming(&simple_def(), HashMap::new())
917            .await
918            .unwrap();
919
920        let mut saw_started = false;
921        let mut saw_completed = false;
922
923        loop {
924            match rx.recv().await {
925                Ok(FlowEvent::FlowStarted { .. }) => saw_started = true,
926                Ok(FlowEvent::FlowCompleted { .. }) => {
927                    saw_completed = true;
928                    break;
929                }
930                Ok(_) => {}
931                Err(_) => break,
932            }
933        }
934
935        assert!(saw_started, "FlowStarted not received");
936        assert!(saw_completed, "FlowCompleted not received");
937    }
938
939    #[tokio::test]
940    async fn start_streaming_delivers_node_events_for_each_node() {
941        use crate::event::FlowEvent;
942        use std::collections::HashSet;
943
944        let engine = FlowEngine::new(NodeRegistry::with_defaults());
945        let (_, mut rx) = engine
946            .start_streaming(&simple_def(), HashMap::new())
947            .await
948            .unwrap();
949
950        let mut completed_nodes: HashSet<String> = HashSet::new();
951
952        loop {
953            match rx.recv().await {
954                Ok(FlowEvent::NodeCompleted { node_id, .. }) => {
955                    completed_nodes.insert(node_id);
956                }
957                Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
958                Ok(_) => {}
959            }
960        }
961
962        assert!(completed_nodes.contains("a"), "node 'a' not in stream");
963        assert!(completed_nodes.contains("b"), "node 'b' not in stream");
964    }
965
966    #[tokio::test]
967    async fn start_streaming_zero_events_lost_on_fast_flow() {
968        // Sanity check: even on an instantaneously completing flow the
969        // receiver created before spawn misses no events.
970        let engine = FlowEngine::new(NodeRegistry::with_defaults());
971        let def = json!({ "nodes": [{ "id": "x", "type": "noop" }], "edges": [] });
972
973        let (_id, mut rx) = engine.start_streaming(&def, HashMap::new()).await.unwrap();
974
975        let mut event_count = 0u32;
976        loop {
977            match rx.recv().await {
978                Ok(_) => event_count += 1,
979                Err(_) => break,
980            }
981        }
982        // FlowStarted + NodeStarted + NodeCompleted + FlowCompleted = 4 minimum
983        assert!(event_count >= 4, "expected ≥4 events, got {event_count}");
984    }
985
986    #[tokio::test]
987    async fn start_streaming_existing_emitter_also_fires() {
988        use crate::event::{EventEmitter, FlowEvent};
989        use std::sync::atomic::{AtomicU32, Ordering};
990
991        struct CountEmitter(Arc<AtomicU32>);
992
993        #[async_trait::async_trait]
994        impl EventEmitter for CountEmitter {
995            async fn on_flow_started(&self, _: Uuid) {
996                self.0.fetch_add(1, Ordering::SeqCst);
997            }
998            async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {}
999            async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1000            async fn on_flow_terminated(&self, _: Uuid) {}
1001            async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
1002            async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {}
1003            async fn on_node_skipped(&self, _: Uuid, _: &str) {}
1004            async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
1005        }
1006
1007        let counter = Arc::new(AtomicU32::new(0));
1008        let engine = FlowEngine::new(NodeRegistry::with_defaults())
1009            .with_event_emitter(
1010                Arc::new(CountEmitter(Arc::clone(&counter))) as Arc<dyn EventEmitter>
1011            );
1012
1013        let (_id, mut rx) = engine
1014            .start_streaming(&simple_def(), HashMap::new())
1015            .await
1016            .unwrap();
1017
1018        // Drain the stream.
1019        loop {
1020            match rx.recv().await {
1021                Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1022                Ok(_) => {}
1023            }
1024        }
1025
1026        // The existing CountEmitter should also have received FlowStarted.
1027        assert_eq!(
1028            counter.load(Ordering::SeqCst),
1029            1,
1030            "existing emitter did not fire"
1031        );
1032    }
1033
1034    // ── validate ───────────────────────────────────────────────────────────
1035
1036    #[test]
1037    fn validate_returns_empty_for_valid_flow() {
1038        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1039        let def = json!({
1040            "nodes": [
1041                { "id": "a", "type": "noop" },
1042                { "id": "b", "type": "noop" }
1043            ],
1044            "edges": [{ "source": "a", "target": "b" }]
1045        });
1046        assert!(engine.validate(&def).is_empty());
1047    }
1048
1049    #[test]
1050    fn validate_catches_unknown_node_type() {
1051        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1052        let def = json!({
1053            "nodes": [
1054                { "id": "a", "type": "noop" },
1055                { "id": "b", "type": "does-not-exist" }
1056            ],
1057            "edges": []
1058        });
1059        let issues = engine.validate(&def);
1060        assert_eq!(issues.len(), 1);
1061        assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1062        assert!(issues[0].message.contains("unknown node type"));
1063    }
1064
1065    #[test]
1066    fn validate_catches_cyclic_graph() {
1067        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1068        let def = json!({
1069            "nodes": [
1070                { "id": "a", "type": "noop" },
1071                { "id": "b", "type": "noop" }
1072            ],
1073            "edges": [
1074                { "source": "a", "target": "b" },
1075                { "source": "b", "target": "a" }
1076            ]
1077        });
1078        let issues = engine.validate(&def);
1079        assert_eq!(issues.len(), 1);
1080        assert!(issues[0].node_id.is_none());
1081    }
1082
1083    #[test]
1084    fn validate_catches_run_if_referencing_unknown_node() {
1085        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1086        let def = json!({
1087            "nodes": [
1088                { "id": "a", "type": "noop" },
1089                {
1090                    "id": "b",
1091                    "type": "noop",
1092                    "data": {
1093                        "run_if": { "from": "ghost", "path": "", "op": "eq", "value": true }
1094                    }
1095                }
1096            ],
1097            "edges": [{ "source": "a", "target": "b" }]
1098        });
1099        let issues = engine.validate(&def);
1100        assert_eq!(issues.len(), 1);
1101        assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1102        assert!(issues[0].message.contains("ghost"));
1103    }
1104
1105    #[test]
1106    fn validate_reports_multiple_issues() {
1107        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1108        let def = json!({
1109            "nodes": [
1110                { "id": "a", "type": "bad-type-1" },
1111                { "id": "b", "type": "bad-type-2" }
1112            ],
1113            "edges": []
1114        });
1115        assert_eq!(engine.validate(&def).len(), 2);
1116    }
1117}