Skip to main content

a3s_flow/
engine.rs

1//! [`FlowEngine`] — the central API for managing workflow executions.
2//!
3//! `FlowEngine` wraps a [`NodeRegistry`] and an in-memory execution map.
4//! It is the recommended entry point for applications that need lifecycle
5//! control over running workflows.
6//!
7//! # Lifecycle
8//!
9//! ```text
10//!                    ┌──────────┐
11//!             start  │ Running  │ ←──────────── resume
12//!           ────────►│          │
13//!                    └────┬─────┘
14//!               pause │   │ node error / all nodes done / terminate
15//!                     ▼   ▼
16//!                 ┌────────────┐   ┌───────────┐   ┌────────────┐
17//!                 │   Paused   │   │ Completed │   │   Failed   │
18//!                 └────────────┘   └───────────┘   └────────────┘
19//!                                                  ┌────────────┐
20//!                                                  │ Terminated │
21//!                                                  └────────────┘
22//! ```
23//!
24//! Pause takes effect at the **next wave boundary** — the current wave of
25//! concurrently executing nodes runs to completion first. Terminate interrupts
26//! the engine between or within waves and is reflected as soon as the runner
27//! task observes the cancellation signal.
28
29use std::collections::HashMap;
30use std::sync::{Arc, RwLock as SyncRwLock};
31
32use serde_json::Value;
33use tokio::sync::{watch, RwLock};
34use tokio_util::sync::CancellationToken;
35use tracing::warn;
36use uuid::Uuid;
37
38use tokio::sync::broadcast;
39
40use crate::error::{FlowError, Result};
41use crate::event::{ChannelEmitter, EventEmitter, FlowEvent, MulticastEmitter, NoopEventEmitter};
42use crate::execution::{ExecutionHandle, ExecutionState};
43use crate::flow_store::FlowStore;
44use crate::graph::DagGraph;
45use crate::registry::NodeRegistry;
46use crate::runner::{FlowRunner, FlowSignal};
47use crate::store::ExecutionStore;
48use crate::validation::ValidationIssue;
49
50/// Central entry point for managing workflow executions.
51///
52/// # Example
53///
54/// ```rust,no_run
55/// use a3s_flow::{FlowEngine, NodeRegistry};
56/// use serde_json::json;
57/// use std::collections::HashMap;
58///
59/// #[tokio::main]
60/// async fn main() -> a3s_flow::Result<()> {
61///     let engine = FlowEngine::new(NodeRegistry::with_defaults());
62///
63///     // Query available node types.
64///     println!("node types: {:?}", engine.node_types());
65///
66///     // Start a workflow and get its execution ID.
67///     let definition = json!({
68///         "nodes": [
69///             { "id": "a", "type": "noop" },
70///             { "id": "b", "type": "noop" }
71///         ],
72///         "edges": [{ "source": "a", "target": "b" }]
73///     });
74///     let id = engine.start(&definition, HashMap::new()).await?;
75///
76///     // Inspect state, pause, resume, or terminate.
77///     println!("state: {:?}", engine.state(id).await?);
78///     Ok(())
79/// }
80/// ```
81pub struct FlowEngine {
82    registry: Arc<NodeRegistry>,
83    executions: Arc<RwLock<HashMap<Uuid, ExecutionHandle>>>,
84    /// Optional store — when set, completed results are automatically persisted.
85    execution_store: Option<Arc<dyn ExecutionStore>>,
86    /// Optional store — enables `start_named` to look up definitions by name.
87    flow_store: Option<Arc<dyn FlowStore>>,
88    /// Emitter passed to each runner; receives all node and flow lifecycle events.
89    emitter: Arc<dyn EventEmitter>,
90    /// When set, passed to every runner to cap per-wave node concurrency.
91    max_concurrency: Option<usize>,
92}
93
94impl FlowEngine {
95    /// Create a new engine with the given node registry.
96    ///
97    /// Uses [`NoopEventEmitter`] and no execution store by default. Use the
98    /// builder methods [`with_execution_store`](Self::with_execution_store) and
99    /// [`with_event_emitter`](Self::with_event_emitter) to customise behaviour.
100    pub fn new(registry: NodeRegistry) -> Self {
101        Self {
102            registry: Arc::new(registry),
103            executions: Arc::new(RwLock::new(HashMap::new())),
104            execution_store: None,
105            flow_store: None,
106            emitter: Arc::new(NoopEventEmitter),
107            max_concurrency: None,
108        }
109    }
110
111    /// Attach an execution store.
112    ///
113    /// When set, every successfully completed execution result is saved to the
114    /// store automatically. Returns `self` for method chaining.
115    pub fn with_execution_store(mut self, store: Arc<dyn ExecutionStore>) -> Self {
116        self.execution_store = Some(store);
117        self
118    }
119
120    /// Attach a flow definition store.
121    ///
122    /// Required for [`start_named`](Self::start_named). Allows any backend
123    /// (in-memory, SQLite, remote API, …) by implementing [`FlowStore`].
124    /// Returns `self` for method chaining.
125    pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
126        self.flow_store = Some(store);
127        self
128    }
129
130    /// Attach a custom event emitter.
131    ///
132    /// The emitter is passed to every runner created by this engine and
133    /// receives all node and flow lifecycle events. Returns `self` for chaining.
134    pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
135        self.emitter = emitter;
136        self
137    }
138
139    /// Limit the number of nodes that may execute concurrently within a single
140    /// wave across all executions started by this engine.
141    ///
142    /// Delegates to [`FlowRunner::with_max_concurrency`]. Returns `self` for chaining.
143    pub fn with_max_concurrency(mut self, n: usize) -> Self {
144        self.max_concurrency = Some(n);
145        self
146    }
147
148    // ── Node type discovery ────────────────────────────────────────────────
149
150    /// Return all registered node type strings, sorted alphabetically.
151    ///
152    /// Includes built-in types (e.g. `"noop"`) and any types registered via
153    /// [`NodeRegistry::register`].
154    pub fn node_types(&self) -> Vec<String> {
155        self.registry.list_types()
156    }
157
158    // ── Pre-flight validation ──────────────────────────────────────────────
159
160    /// Validate a flow definition without executing it.
161    ///
162    /// Returns a list of [`ValidationIssue`]s describing structural problems.
163    /// An empty list means the definition is valid and ready to run.
164    ///
165    /// The following checks are performed:
166    /// - DAG structural validity: no cycles, no unknown edge references,
167    ///   no duplicate node IDs, at least one node.
168    /// - All node types are registered in the engine's [`NodeRegistry`].
169    /// - Every `run_if.from` field references an existing node ID.
170    ///
171    /// ```rust
172    /// use a3s_flow::{FlowEngine, NodeRegistry};
173    /// use serde_json::json;
174    ///
175    /// let engine = FlowEngine::new(NodeRegistry::with_defaults());
176    /// let def = json!({
177    ///     "nodes": [
178    ///         { "id": "a", "type": "noop" },
179    ///         { "id": "b", "type": "unknown-type" }
180    ///     ],
181    ///     "edges": []
182    /// });
183    /// let issues = engine.validate(&def);
184    /// assert_eq!(issues.len(), 1);
185    /// assert!(issues[0].message.contains("unknown node type"));
186    /// ```
187    pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue> {
188        let mut issues = Vec::new();
189
190        // Parse the DAG — catches cycle, unknown refs, duplicate IDs, empty flow.
191        let dag = match DagGraph::from_json(definition) {
192            Ok(dag) => dag,
193            Err(e) => {
194                issues.push(ValidationIssue {
195                    node_id: None,
196                    message: e.to_string(),
197                });
198                return issues;
199            }
200        };
201
202        for node_def in dag.nodes_in_order() {
203            // Check node type is registered.
204            if self.registry.get(&node_def.node_type).is_err() {
205                issues.push(ValidationIssue {
206                    node_id: Some(node_def.id.clone()),
207                    message: format!("unknown node type '{}'", node_def.node_type),
208                });
209            }
210
211            // Check run_if.from references an existing node.
212            if let Some(ref cond) = node_def.run_if {
213                if !dag.nodes.contains_key(&cond.from) {
214                    issues.push(ValidationIssue {
215                        node_id: Some(node_def.id.clone()),
216                        message: format!("run_if references unknown node '{}'", cond.from),
217                    });
218                }
219            }
220        }
221
222        issues
223    }
224
225    // ── Execution lifecycle ────────────────────────────────────────────────
226
227    /// Start a new workflow execution from a JSON DAG definition.
228    ///
229    /// The definition is parsed and validated synchronously. If valid, the
230    /// execution is launched in a background Tokio task and the execution ID
231    /// is returned immediately — the flow runs concurrently with the caller.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the definition is invalid (cycle, unknown node ID,
236    /// bad JSON, unregistered node type).
237    pub async fn start(
238        &self,
239        definition: &Value,
240        variables: HashMap<String, Value>,
241    ) -> Result<Uuid> {
242        self.start_inner(definition, variables, Arc::clone(&self.emitter))
243            .await
244    }
245
246    /// Start a workflow and return a live event stream alongside the execution ID.
247    ///
248    /// The returned [`broadcast::Receiver<FlowEvent>`] is created **before** the
249    /// execution task is spawned, guaranteeing that no events are missed —
250    /// including `FlowStarted`. Multiple subscribers can be created by calling
251    /// [`broadcast::Receiver::resubscribe`].
252    ///
253    /// The stream closes (returns `Err(RecvError::Closed)`) when the execution
254    /// reaches a terminal state (`Completed`, `Failed`, or `Terminated`).
255    ///
256    /// If the engine also has a custom [`EventEmitter`] configured via
257    /// [`with_event_emitter`](Self::with_event_emitter), both the emitter and
258    /// the broadcast channel receive every event.
259    ///
260    /// # Example
261    ///
262    /// ```rust,no_run
263    /// use a3s_flow::{FlowEngine, FlowEvent, NodeRegistry};
264    /// use serde_json::json;
265    /// use std::collections::HashMap;
266    ///
267    /// #[tokio::main]
268    /// async fn main() -> a3s_flow::Result<()> {
269    ///     let engine = FlowEngine::new(NodeRegistry::with_defaults());
270    ///     let def = json!({
271    ///         "nodes": [{ "id": "a", "type": "noop" }],
272    ///         "edges": []
273    ///     });
274    ///
275    ///     let (id, mut rx) = engine.start_streaming(&def, HashMap::new()).await?;
276    ///
277    ///     while let Ok(event) = rx.recv().await {
278    ///         match event {
279    ///             FlowEvent::NodeCompleted { node_id, .. } => println!("done: {node_id}"),
280    ///             FlowEvent::FlowCompleted { .. } => break,
281    ///             _ => {}
282    ///         }
283    ///     }
284    ///     Ok(())
285    /// }
286    /// ```
287    pub async fn start_streaming(
288        &self,
289        definition: &Value,
290        variables: HashMap<String, Value>,
291    ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
292        let (tx, rx) = broadcast::channel(256);
293        let channel_emitter = Arc::new(ChannelEmitter::new(tx));
294        let multicast: Arc<dyn EventEmitter> = Arc::new(MulticastEmitter {
295            a: Arc::clone(&self.emitter),
296            b: channel_emitter,
297        });
298        let id = self.start_inner(definition, variables, multicast).await?;
299        Ok((id, rx))
300    }
301
302    /// Internal start implementation; accepts an explicit emitter so that both
303    /// `start` and `start_streaming` can share the same spawn logic.
304    async fn start_inner(
305        &self,
306        definition: &Value,
307        variables: HashMap<String, Value>,
308        emitter: Arc<dyn EventEmitter>,
309    ) -> Result<Uuid> {
310        let dag = DagGraph::from_json(definition)?;
311        let mut runner = FlowRunner::new(dag, (*self.registry).clone()).with_event_emitter(emitter);
312        if let Some(ref fs) = self.flow_store {
313            runner = runner.with_flow_store(Arc::clone(fs));
314        }
315        if let Some(n) = self.max_concurrency {
316            runner = runner.with_max_concurrency(n);
317        }
318
319        let execution_id = Uuid::new_v4();
320        let cancel = CancellationToken::new();
321        let (signal_tx, signal_rx) = watch::channel(FlowSignal::Run);
322        let state = Arc::new(RwLock::new(ExecutionState::Running));
323        let context: Arc<SyncRwLock<HashMap<String, Value>>> =
324            Arc::new(SyncRwLock::new(HashMap::new()));
325
326        let handle = ExecutionHandle {
327            state: Arc::clone(&state),
328            signal_tx,
329            cancel: cancel.clone(),
330            context: Arc::clone(&context),
331        };
332
333        self.executions.write().await.insert(execution_id, handle);
334
335        // Spawn the execution task; it updates state on terminal transitions.
336        // Logging and event emission are handled by FlowRunner::run_seeded.
337        let state_for_task = Arc::clone(&state);
338        let execution_store = self.execution_store.clone();
339        tokio::spawn(async move {
340            match runner
341                .run_controlled(execution_id, variables, signal_rx, cancel, context)
342                .await
343            {
344                Ok(result) => {
345                    // Persist the result if a store is configured.
346                    if let Some(ref store) = execution_store {
347                        if let Err(e) = store.save(&result).await {
348                            warn!(%execution_id, error = %e, "failed to persist execution result");
349                        }
350                    }
351                    *state_for_task.write().await = ExecutionState::Completed(result);
352                }
353                Err(FlowError::Terminated) => {
354                    *state_for_task.write().await = ExecutionState::Terminated;
355                }
356                Err(e) => {
357                    *state_for_task.write().await = ExecutionState::Failed(e.to_string());
358                }
359            }
360        });
361
362        Ok(execution_id)
363    }
364
365    /// Start a workflow by loading its definition from the configured
366    /// [`FlowStore`] by name.
367    ///
368    /// Equivalent to:
369    /// ```rust,ignore
370    /// let def = flow_store.load(name).await?.ok_or(...)?;
371    /// engine.start(&def, variables).await
372    /// ```
373    ///
374    /// # Errors
375    ///
376    /// - [`FlowError::Internal`] if no `FlowStore` was configured via
377    ///   [`with_flow_store`](Self::with_flow_store).
378    /// - [`FlowError::FlowNotFound`] if no definition exists under `name`.
379    /// - Any error returned by [`start`](Self::start) (invalid definition, etc.).
380    pub async fn start_named(&self, name: &str, variables: HashMap<String, Value>) -> Result<Uuid> {
381        let store = self.flow_store.as_ref().ok_or_else(|| {
382            FlowError::Internal("no FlowStore configured; call with_flow_store first".into())
383        })?;
384
385        let definition = store
386            .load(name)
387            .await?
388            .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
389
390        self.start(&definition, variables).await
391    }
392
393    /// Pause a running execution at the next wave boundary.
394    ///
395    /// Nodes in the **current wave** continue until they finish. No new wave
396    /// starts until [`resume`](Self::resume) is called.
397    ///
398    /// # Errors
399    ///
400    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
401    /// - [`FlowError::InvalidTransition`] if the execution is not `Running`.
402    pub async fn pause(&self, id: Uuid) -> Result<()> {
403        let executions = self.executions.read().await;
404        let handle = executions
405            .get(&id)
406            .ok_or(FlowError::ExecutionNotFound(id))?;
407
408        let mut state = handle.state.write().await;
409        match *state {
410            ExecutionState::Running => {
411                handle.signal_tx.send(FlowSignal::Pause).ok();
412                *state = ExecutionState::Paused;
413                Ok(())
414            }
415            ref s => Err(FlowError::InvalidTransition {
416                action: "pause".into(),
417                from: s.as_str().into(),
418            }),
419        }
420    }
421
422    /// Resume a paused execution.
423    ///
424    /// # Errors
425    ///
426    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
427    /// - [`FlowError::InvalidTransition`] if the execution is not `Paused`.
428    pub async fn resume(&self, id: Uuid) -> Result<()> {
429        let executions = self.executions.read().await;
430        let handle = executions
431            .get(&id)
432            .ok_or(FlowError::ExecutionNotFound(id))?;
433
434        let mut state = handle.state.write().await;
435        match *state {
436            ExecutionState::Paused => {
437                handle.signal_tx.send(FlowSignal::Run).ok();
438                *state = ExecutionState::Running;
439                Ok(())
440            }
441            ref s => Err(FlowError::InvalidTransition {
442                action: "resume".into(),
443                from: s.as_str().into(),
444            }),
445        }
446    }
447
448    /// Terminate an execution immediately.
449    ///
450    /// Sends a cancellation signal. The execution task stops at the next
451    /// cancellation checkpoint (between waves, or within a wave's result
452    /// collection). If the execution is currently paused it is unblocked so
453    /// it can observe the cancellation.
454    ///
455    /// # Errors
456    ///
457    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
458    /// - [`FlowError::InvalidTransition`] if the execution is already in a
459    ///   terminal state (`Completed`, `Failed`, `Terminated`).
460    pub async fn terminate(&self, id: Uuid) -> Result<()> {
461        let executions = self.executions.read().await;
462        let handle = executions
463            .get(&id)
464            .ok_or(FlowError::ExecutionNotFound(id))?;
465
466        let state = handle.state.read().await;
467        if state.is_terminal() {
468            return Err(FlowError::InvalidTransition {
469                action: "terminate".into(),
470                from: state.as_str().into(),
471            });
472        }
473        drop(state);
474
475        handle.cancel.cancel();
476        // Unblock a paused runner so it can observe the cancellation.
477        handle.signal_tx.send(FlowSignal::Run).ok();
478        Ok(())
479    }
480
481    /// Return a snapshot of the current state of an execution.
482    ///
483    /// # Errors
484    ///
485    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
486    pub async fn state(&self, id: Uuid) -> Result<ExecutionState> {
487        let executions = self.executions.read().await;
488        let handle = executions
489            .get(&id)
490            .ok_or(FlowError::ExecutionNotFound(id))?;
491        // Clone while the guard is still held, then drop the guard before returning.
492        let snapshot = handle.state.read().await.clone();
493        Ok(snapshot)
494    }
495
496    // ── Context CRUD ───────────────────────────────────────────────────────
497
498    /// Return a snapshot of the shared mutable context for a running execution.
499    ///
500    /// The context is a `HashMap<String, Value>` that nodes may read and write
501    /// via [`ExecContext::context`](crate::node::ExecContext::context) during
502    /// execution. This method lets the caller inspect (or react to) the
503    /// accumulated state from outside the runner.
504    ///
505    /// # Errors
506    ///
507    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
508    pub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>> {
509        let executions = self.executions.read().await;
510        let handle = executions
511            .get(&id)
512            .ok_or(FlowError::ExecutionNotFound(id))?;
513        let snapshot = handle.context.read().unwrap().clone();
514        Ok(snapshot)
515    }
516
517    /// Insert or overwrite a single entry in the shared context of a running
518    /// execution.
519    ///
520    /// The change is immediately visible to any node that reads the context
521    /// after this call returns.
522    ///
523    /// # Errors
524    ///
525    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
526    pub async fn set_context_entry(&self, id: Uuid, key: String, value: Value) -> Result<()> {
527        let executions = self.executions.read().await;
528        let handle = executions
529            .get(&id)
530            .ok_or(FlowError::ExecutionNotFound(id))?;
531        handle.context.write().unwrap().insert(key, value);
532        Ok(())
533    }
534
535    /// Remove a single entry from the shared context of a running execution.
536    ///
537    /// Returns `true` if the key existed and was removed, `false` if it was
538    /// not present.
539    ///
540    /// # Errors
541    ///
542    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
543    pub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool> {
544        let executions = self.executions.read().await;
545        let handle = executions
546            .get(&id)
547            .ok_or(FlowError::ExecutionNotFound(id))?;
548        let removed = handle.context.write().unwrap().remove(key).is_some();
549        Ok(removed)
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use crate::node::{ExecContext, Node};
557    use async_trait::async_trait;
558    use serde_json::{json, Value};
559    use std::time::Duration;
560
561    // ── Helpers ────────────────────────────────────────────────────────────
562
563    /// A node that sleeps for the given duration before returning.
564    struct SlowNode(Duration);
565
566    #[async_trait]
567    impl Node for SlowNode {
568        fn node_type(&self) -> &str {
569            "slow"
570        }
571
572        async fn execute(&self, _ctx: ExecContext) -> crate::error::Result<Value> {
573            tokio::time::sleep(self.0).await;
574            Ok(json!({}))
575        }
576    }
577
578    fn slow_engine(delay: Duration) -> FlowEngine {
579        let mut registry = NodeRegistry::with_defaults();
580        registry.register(Arc::new(SlowNode(delay)));
581        FlowEngine::new(registry)
582    }
583
584    fn simple_def() -> Value {
585        json!({
586            "nodes": [
587                { "id": "a", "type": "noop" },
588                { "id": "b", "type": "noop" }
589            ],
590            "edges": [{ "source": "a", "target": "b" }]
591        })
592    }
593
594    fn slow_def() -> Value {
595        json!({
596            "nodes": [
597                { "id": "a", "type": "slow" },
598                { "id": "b", "type": "slow" }
599            ],
600            "edges": [{ "source": "a", "target": "b" }]
601        })
602    }
603
604    // ── node_types ─────────────────────────────────────────────────────────
605
606    #[test]
607    fn node_types_includes_builtins() {
608        let engine = FlowEngine::new(NodeRegistry::with_defaults());
609        let types = engine.node_types();
610        assert!(types.contains(&"noop".to_string()));
611    }
612
613    #[test]
614    fn node_types_includes_custom_nodes() {
615        let mut registry = NodeRegistry::with_defaults();
616        registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
617        let engine = FlowEngine::new(registry);
618
619        let types = engine.node_types();
620        assert!(types.contains(&"noop".to_string()));
621        assert!(types.contains(&"slow".to_string()));
622    }
623
624    #[test]
625    fn node_types_is_sorted() {
626        let engine = FlowEngine::new(NodeRegistry::with_defaults());
627        let types = engine.node_types();
628        let mut sorted = types.clone();
629        sorted.sort();
630        assert_eq!(types, sorted);
631    }
632
633    // ── start ──────────────────────────────────────────────────────────────
634
635    #[tokio::test]
636    async fn start_returns_execution_id() {
637        let engine = FlowEngine::new(NodeRegistry::with_defaults());
638        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
639        // ID is non-nil.
640        assert!(!id.is_nil());
641    }
642
643    #[tokio::test]
644    async fn start_rejects_invalid_definition() {
645        let engine = FlowEngine::new(NodeRegistry::with_defaults());
646        let bad = json!({
647            "nodes": [{ "id": "a", "type": "noop" }],
648            "edges": [{ "source": "ghost", "target": "a" }]
649        });
650        assert!(matches!(
651            engine.start(&bad, HashMap::new()).await,
652            Err(FlowError::UnknownNode(_))
653        ));
654    }
655
656    #[tokio::test]
657    async fn completed_flow_has_outputs() {
658        let engine = FlowEngine::new(NodeRegistry::with_defaults());
659        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
660
661        // Wait for the background task to finish.
662        tokio::time::sleep(Duration::from_millis(50)).await;
663
664        let state = engine.state(id).await.unwrap();
665        if let ExecutionState::Completed(result) = state {
666            assert!(result.outputs.contains_key("a"));
667            assert!(result.outputs.contains_key("b"));
668        } else {
669            panic!("expected Completed, got {}", state.as_str());
670        }
671    }
672
673    // ── state ──────────────────────────────────────────────────────────────
674
675    #[tokio::test]
676    async fn state_returns_not_found_for_unknown_id() {
677        let engine = FlowEngine::new(NodeRegistry::with_defaults());
678        let err = engine.state(Uuid::new_v4()).await.unwrap_err();
679        assert!(matches!(err, FlowError::ExecutionNotFound(_)));
680    }
681
682    // ── pause / resume ─────────────────────────────────────────────────────
683
684    #[tokio::test]
685    async fn pause_transitions_to_paused() {
686        let engine = slow_engine(Duration::from_millis(200));
687        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
688
689        // Give the runner a moment to start wave 1.
690        tokio::time::sleep(Duration::from_millis(10)).await;
691        engine.pause(id).await.unwrap();
692
693        assert!(matches!(
694            engine.state(id).await.unwrap(),
695            ExecutionState::Paused
696        ));
697    }
698
699    #[tokio::test]
700    async fn resume_transitions_to_running() {
701        let engine = slow_engine(Duration::from_millis(200));
702        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
703
704        tokio::time::sleep(Duration::from_millis(10)).await;
705        engine.pause(id).await.unwrap();
706        engine.resume(id).await.unwrap();
707
708        assert!(matches!(
709            engine.state(id).await.unwrap(),
710            ExecutionState::Running
711        ));
712    }
713
714    #[tokio::test]
715    async fn pause_on_completed_flow_returns_invalid_transition() {
716        let engine = FlowEngine::new(NodeRegistry::with_defaults());
717        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
718
719        tokio::time::sleep(Duration::from_millis(50)).await;
720        // Flow should be completed by now.
721        let err = engine.pause(id).await.unwrap_err();
722        assert!(matches!(err, FlowError::InvalidTransition { .. }));
723    }
724
725    #[tokio::test]
726    async fn resume_on_running_flow_returns_invalid_transition() {
727        let engine = slow_engine(Duration::from_millis(200));
728        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
729
730        tokio::time::sleep(Duration::from_millis(10)).await;
731        // Still running — not paused.
732        let err = engine.resume(id).await.unwrap_err();
733        assert!(matches!(err, FlowError::InvalidTransition { .. }));
734
735        engine.terminate(id).await.unwrap();
736    }
737
738    // ── terminate ──────────────────────────────────────────────────────────
739
740    #[tokio::test]
741    async fn terminate_stops_slow_execution() {
742        let engine = slow_engine(Duration::from_millis(500));
743        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
744
745        tokio::time::sleep(Duration::from_millis(10)).await;
746        engine.terminate(id).await.unwrap();
747
748        // The runner task should observe the cancellation quickly.
749        tokio::time::sleep(Duration::from_millis(50)).await;
750
751        assert!(matches!(
752            engine.state(id).await.unwrap(),
753            ExecutionState::Terminated
754        ));
755    }
756
757    #[tokio::test]
758    async fn terminate_unblocks_paused_execution() {
759        let engine = slow_engine(Duration::from_millis(500));
760        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
761
762        tokio::time::sleep(Duration::from_millis(10)).await;
763        engine.pause(id).await.unwrap();
764
765        // Flow is paused — now terminate it.
766        engine.terminate(id).await.unwrap();
767
768        tokio::time::sleep(Duration::from_millis(600)).await;
769
770        assert!(matches!(
771            engine.state(id).await.unwrap(),
772            ExecutionState::Terminated
773        ));
774    }
775
776    #[tokio::test]
777    async fn terminate_on_completed_flow_returns_invalid_transition() {
778        let engine = FlowEngine::new(NodeRegistry::with_defaults());
779        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
780
781        tokio::time::sleep(Duration::from_millis(50)).await;
782        let err = engine.terminate(id).await.unwrap_err();
783        assert!(matches!(err, FlowError::InvalidTransition { .. }));
784    }
785
786    #[tokio::test]
787    async fn unknown_execution_id_returns_not_found() {
788        let engine = FlowEngine::new(NodeRegistry::with_defaults());
789        let id = Uuid::new_v4();
790        assert!(matches!(
791            engine.pause(id).await,
792            Err(FlowError::ExecutionNotFound(_))
793        ));
794        assert!(matches!(
795            engine.resume(id).await,
796            Err(FlowError::ExecutionNotFound(_))
797        ));
798        assert!(matches!(
799            engine.terminate(id).await,
800            Err(FlowError::ExecutionNotFound(_))
801        ));
802    }
803
804    // ── ExecutionStore integration ──────────────────────────────────────────
805
806    #[tokio::test]
807    async fn execution_store_saves_completed_result() {
808        use crate::store::MemoryExecutionStore;
809
810        let store = Arc::new(MemoryExecutionStore::new());
811        let engine = FlowEngine::new(NodeRegistry::with_defaults())
812            .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
813
814        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
815        tokio::time::sleep(Duration::from_millis(50)).await;
816
817        // Engine should have saved the result.
818        let ids = store.list().await.unwrap();
819        assert!(ids.contains(&id), "stored execution id not found");
820
821        let saved = store.load(id).await.unwrap().unwrap();
822        assert_eq!(saved.execution_id, id);
823        assert!(saved.outputs.contains_key("a"));
824        assert!(saved.outputs.contains_key("b"));
825    }
826
827    #[tokio::test]
828    async fn execution_store_not_used_on_terminated_execution() {
829        use crate::store::MemoryExecutionStore;
830
831        let store = Arc::new(MemoryExecutionStore::new());
832        let engine = slow_engine(Duration::from_millis(500))
833            .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
834
835        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
836        tokio::time::sleep(Duration::from_millis(10)).await;
837        engine.terminate(id).await.unwrap();
838        tokio::time::sleep(Duration::from_millis(50)).await;
839
840        // Terminated executions are not saved.
841        assert!(
842            store.list().await.unwrap().is_empty(),
843            "terminated result should not be stored"
844        );
845    }
846
847    // ── EventEmitter integration (via engine) ───────────────────────────────
848
849    #[tokio::test]
850    async fn engine_emitter_receives_flow_and_node_events() {
851        use crate::event::EventEmitter;
852        use std::sync::atomic::{AtomicU32, Ordering};
853
854        struct CountEmitter {
855            flow_started: Arc<AtomicU32>,
856            flow_completed: Arc<AtomicU32>,
857            node_started: Arc<AtomicU32>,
858            node_completed: Arc<AtomicU32>,
859        }
860
861        #[async_trait::async_trait]
862        impl EventEmitter for CountEmitter {
863            async fn on_flow_started(&self, _: Uuid) {
864                self.flow_started.fetch_add(1, Ordering::SeqCst);
865            }
866            async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {
867                self.flow_completed.fetch_add(1, Ordering::SeqCst);
868            }
869            async fn on_flow_failed(&self, _: Uuid, _: &str) {}
870            async fn on_flow_terminated(&self, _: Uuid) {}
871            async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
872                self.node_started.fetch_add(1, Ordering::SeqCst);
873            }
874            async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {
875                self.node_completed.fetch_add(1, Ordering::SeqCst);
876            }
877            async fn on_node_skipped(&self, _: Uuid, _: &str) {}
878            async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
879        }
880
881        let flow_started = Arc::new(AtomicU32::new(0));
882        let flow_completed = Arc::new(AtomicU32::new(0));
883        let node_started = Arc::new(AtomicU32::new(0));
884        let node_completed = Arc::new(AtomicU32::new(0));
885
886        let emitter = Arc::new(CountEmitter {
887            flow_started: Arc::clone(&flow_started),
888            flow_completed: Arc::clone(&flow_completed),
889            node_started: Arc::clone(&node_started),
890            node_completed: Arc::clone(&node_completed),
891        });
892
893        let engine = FlowEngine::new(NodeRegistry::with_defaults())
894            .with_event_emitter(emitter as Arc<dyn EventEmitter>);
895
896        // simple_def has nodes a and b.
897        engine.start(&simple_def(), HashMap::new()).await.unwrap();
898        tokio::time::sleep(Duration::from_millis(50)).await;
899
900        assert_eq!(flow_started.load(Ordering::SeqCst), 1, "flow_started");
901        assert_eq!(flow_completed.load(Ordering::SeqCst), 1, "flow_completed");
902        assert_eq!(node_started.load(Ordering::SeqCst), 2, "node_started (a+b)");
903        assert_eq!(
904            node_completed.load(Ordering::SeqCst),
905            2,
906            "node_completed (a+b)"
907        );
908    }
909
910    // ── start_named ────────────────────────────────────────────────────────
911
912    #[tokio::test]
913    async fn start_named_loads_and_runs_from_flow_store() {
914        use crate::flow_store::MemoryFlowStore;
915
916        let flow_store = Arc::new(MemoryFlowStore::new());
917        flow_store.save("greet", &simple_def()).await.unwrap();
918
919        let engine = FlowEngine::new(NodeRegistry::with_defaults())
920            .with_flow_store(Arc::clone(&flow_store) as Arc<dyn crate::flow_store::FlowStore>);
921
922        let id = engine.start_named("greet", HashMap::new()).await.unwrap();
923        assert!(!id.is_nil());
924
925        tokio::time::sleep(Duration::from_millis(50)).await;
926        assert!(matches!(
927            engine.state(id).await.unwrap(),
928            ExecutionState::Completed(_)
929        ));
930    }
931
932    #[tokio::test]
933    async fn start_named_returns_flow_not_found_for_unknown_name() {
934        use crate::flow_store::MemoryFlowStore;
935
936        let engine = FlowEngine::new(NodeRegistry::with_defaults())
937            .with_flow_store(
938                Arc::new(MemoryFlowStore::new()) as Arc<dyn crate::flow_store::FlowStore>
939            );
940
941        let err = engine
942            .start_named("nonexistent", HashMap::new())
943            .await
944            .unwrap_err();
945
946        assert!(
947            matches!(err, FlowError::FlowNotFound(ref n) if n == "nonexistent"),
948            "expected FlowNotFound, got: {err}"
949        );
950    }
951
952    #[tokio::test]
953    async fn start_named_returns_internal_when_no_store_configured() {
954        let engine = FlowEngine::new(NodeRegistry::with_defaults());
955
956        let err = engine
957            .start_named("anything", HashMap::new())
958            .await
959            .unwrap_err();
960
961        assert!(
962            matches!(err, FlowError::Internal(_)),
963            "expected Internal, got: {err}"
964        );
965    }
966
967    // ── start_streaming ────────────────────────────────────────────────────
968
969    #[tokio::test]
970    async fn start_streaming_delivers_flow_started_and_completed_events() {
971        use crate::event::FlowEvent;
972
973        let engine = FlowEngine::new(NodeRegistry::with_defaults());
974        let (_, mut rx) = engine
975            .start_streaming(&simple_def(), HashMap::new())
976            .await
977            .unwrap();
978
979        let mut saw_started = false;
980        let mut saw_completed = false;
981
982        loop {
983            match rx.recv().await {
984                Ok(FlowEvent::FlowStarted { .. }) => saw_started = true,
985                Ok(FlowEvent::FlowCompleted { .. }) => {
986                    saw_completed = true;
987                    break;
988                }
989                Ok(_) => {}
990                Err(_) => break,
991            }
992        }
993
994        assert!(saw_started, "FlowStarted not received");
995        assert!(saw_completed, "FlowCompleted not received");
996    }
997
998    #[tokio::test]
999    async fn start_streaming_delivers_node_events_for_each_node() {
1000        use crate::event::FlowEvent;
1001        use std::collections::HashSet;
1002
1003        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1004        let (_, mut rx) = engine
1005            .start_streaming(&simple_def(), HashMap::new())
1006            .await
1007            .unwrap();
1008
1009        let mut completed_nodes: HashSet<String> = HashSet::new();
1010
1011        loop {
1012            match rx.recv().await {
1013                Ok(FlowEvent::NodeCompleted { node_id, .. }) => {
1014                    completed_nodes.insert(node_id);
1015                }
1016                Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1017                Ok(_) => {}
1018            }
1019        }
1020
1021        assert!(completed_nodes.contains("a"), "node 'a' not in stream");
1022        assert!(completed_nodes.contains("b"), "node 'b' not in stream");
1023    }
1024
1025    #[tokio::test]
1026    async fn start_streaming_zero_events_lost_on_fast_flow() {
1027        // Sanity check: even on an instantaneously completing flow the
1028        // receiver created before spawn misses no events.
1029        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1030        let def = json!({ "nodes": [{ "id": "x", "type": "noop" }], "edges": [] });
1031
1032        let (_id, mut rx) = engine.start_streaming(&def, HashMap::new()).await.unwrap();
1033
1034        let mut event_count = 0u32;
1035        loop {
1036            match rx.recv().await {
1037                Ok(_) => event_count += 1,
1038                Err(_) => break,
1039            }
1040        }
1041        // FlowStarted + NodeStarted + NodeCompleted + FlowCompleted = 4 minimum
1042        assert!(event_count >= 4, "expected ≥4 events, got {event_count}");
1043    }
1044
1045    #[tokio::test]
1046    async fn start_streaming_existing_emitter_also_fires() {
1047        use crate::event::{EventEmitter, FlowEvent};
1048        use std::sync::atomic::{AtomicU32, Ordering};
1049
1050        struct CountEmitter(Arc<AtomicU32>);
1051
1052        #[async_trait::async_trait]
1053        impl EventEmitter for CountEmitter {
1054            async fn on_flow_started(&self, _: Uuid) {
1055                self.0.fetch_add(1, Ordering::SeqCst);
1056            }
1057            async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {}
1058            async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1059            async fn on_flow_terminated(&self, _: Uuid) {}
1060            async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
1061            async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {}
1062            async fn on_node_skipped(&self, _: Uuid, _: &str) {}
1063            async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
1064        }
1065
1066        let counter = Arc::new(AtomicU32::new(0));
1067        let engine = FlowEngine::new(NodeRegistry::with_defaults())
1068            .with_event_emitter(
1069                Arc::new(CountEmitter(Arc::clone(&counter))) as Arc<dyn EventEmitter>
1070            );
1071
1072        let (_id, mut rx) = engine
1073            .start_streaming(&simple_def(), HashMap::new())
1074            .await
1075            .unwrap();
1076
1077        // Drain the stream.
1078        loop {
1079            match rx.recv().await {
1080                Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1081                Ok(_) => {}
1082            }
1083        }
1084
1085        // The existing CountEmitter should also have received FlowStarted.
1086        assert_eq!(
1087            counter.load(Ordering::SeqCst),
1088            1,
1089            "existing emitter did not fire"
1090        );
1091    }
1092
1093    // ── validate ───────────────────────────────────────────────────────────
1094
1095    #[test]
1096    fn validate_returns_empty_for_valid_flow() {
1097        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1098        let def = json!({
1099            "nodes": [
1100                { "id": "a", "type": "noop" },
1101                { "id": "b", "type": "noop" }
1102            ],
1103            "edges": [{ "source": "a", "target": "b" }]
1104        });
1105        assert!(engine.validate(&def).is_empty());
1106    }
1107
1108    #[test]
1109    fn validate_catches_unknown_node_type() {
1110        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1111        let def = json!({
1112            "nodes": [
1113                { "id": "a", "type": "noop" },
1114                { "id": "b", "type": "does-not-exist" }
1115            ],
1116            "edges": []
1117        });
1118        let issues = engine.validate(&def);
1119        assert_eq!(issues.len(), 1);
1120        assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1121        assert!(issues[0].message.contains("unknown node type"));
1122    }
1123
1124    #[test]
1125    fn validate_catches_cyclic_graph() {
1126        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1127        let def = json!({
1128            "nodes": [
1129                { "id": "a", "type": "noop" },
1130                { "id": "b", "type": "noop" }
1131            ],
1132            "edges": [
1133                { "source": "a", "target": "b" },
1134                { "source": "b", "target": "a" }
1135            ]
1136        });
1137        let issues = engine.validate(&def);
1138        assert_eq!(issues.len(), 1);
1139        assert!(issues[0].node_id.is_none());
1140    }
1141
1142    #[test]
1143    fn validate_catches_run_if_referencing_unknown_node() {
1144        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1145        let def = json!({
1146            "nodes": [
1147                { "id": "a", "type": "noop" },
1148                {
1149                    "id": "b",
1150                    "type": "noop",
1151                    "data": {
1152                        "run_if": { "from": "ghost", "path": "", "op": "eq", "value": true }
1153                    }
1154                }
1155            ],
1156            "edges": [{ "source": "a", "target": "b" }]
1157        });
1158        let issues = engine.validate(&def);
1159        assert_eq!(issues.len(), 1);
1160        assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1161        assert!(issues[0].message.contains("ghost"));
1162    }
1163
1164    #[test]
1165    fn validate_reports_multiple_issues() {
1166        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1167        let def = json!({
1168            "nodes": [
1169                { "id": "a", "type": "bad-type-1" },
1170                { "id": "b", "type": "bad-type-2" }
1171            ],
1172            "edges": []
1173        });
1174        assert_eq!(engine.validate(&def).len(), 2);
1175    }
1176}